This is an automated email from the ASF dual-hosted git repository.

domgarguilo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new 78cb5207c2 Remove usage of Utils.getTableNameLock() & add concurrency 
test cases (#5822)
78cb5207c2 is described below

commit 78cb5207c29bbc8ac3e8068f0a64c03d59ad007f
Author: Dom G. <[email protected]>
AuthorDate: Fri Aug 29 12:18:31 2025 -0400

    Remove usage of Utils.getTableNameLock() & add concurrency test cases 
(#5822)
    
    * Remove Utils.getTableNameLock() and all places it was used
    * Add test cases with concurrent operations to ensure the places that the 
locks were removed handle things properly without the locks
---
 .../apache/accumulo/manager/tableOps/Utils.java    |   8 -
 .../manager/tableOps/clone/CloneZookeeper.java     |  23 +-
 .../create/PopulateZookeeperWithNamespace.java     |  21 +-
 .../tableOps/namespace/rename/RenameNamespace.java |   2 -
 .../manager/tableOps/rename/RenameTable.java       |   2 -
 .../tableImport/ImportPopulateZookeeper.java       |  18 +-
 .../apache/accumulo/test/TableOperationsIT.java    |  14 +-
 .../ConcurrentTableNameOperationsIT.java           | 426 +++++++++++++++++++++
 8 files changed, 451 insertions(+), 63 deletions(-)

diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/Utils.java 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/Utils.java
index a88afc6ee4..60eb2c4d74 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/Utils.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/Utils.java
@@ -29,8 +29,6 @@ import java.util.SortedMap;
 import java.util.SortedSet;
 import java.util.TreeMap;
 import java.util.TreeSet;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Function;
 
 import org.apache.accumulo.core.Constants;
@@ -89,8 +87,6 @@ public class Utils {
     }
   }
 
-  static final Lock tableNameLock = new ReentrantLock();
-
   private static KeyExtent findContaining(Ample ample, TableId tableId, Text 
row) {
     Objects.requireNonNull(row);
     try (var tablets = ample.readTablets().forTable(tableId).overlapping(row, 
true, row)
@@ -315,10 +311,6 @@ public class Utils {
     return lock;
   }
 
-  public static Lock getTableNameLock() {
-    return tableNameLock;
-  }
-
   public static DistributedLock getReadLock(Manager env, AbstractId<?> id, 
FateId fateId,
       LockRange range) {
     return Utils.getLock(env.getContext(), id, fateId, LockType.READ, range);
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/clone/CloneZookeeper.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/clone/CloneZookeeper.java
index b3d47e5f1c..40e17990f5 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/clone/CloneZookeeper.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/clone/CloneZookeeper.java
@@ -53,21 +53,16 @@ class CloneZookeeper extends ManagerRepo {
 
   @Override
   public Repo<Manager> call(FateId fateId, Manager environment) throws 
Exception {
-    Utils.getTableNameLock().lock();
-    try {
-      var context = environment.getContext();
-      // write tableName & tableId, first to Table Mapping and then to 
Zookeeper
-      
context.getTableMapping(cloneInfo.getNamespaceId()).put(cloneInfo.getTableId(),
-          cloneInfo.getTableName(), TableOperation.CLONE);
-      environment.getTableManager().cloneTable(cloneInfo.getSrcTableId(), 
cloneInfo.getTableId(),
-          cloneInfo.getTableName(), cloneInfo.getNamespaceId(), 
cloneInfo.getPropertiesToSet(),
-          cloneInfo.getPropertiesToExclude());
-      context.clearTableListCache();
+    var context = environment.getContext();
+    // write tableName & tableId, first to Table Mapping and then to Zookeeper
+    
context.getTableMapping(cloneInfo.getNamespaceId()).put(cloneInfo.getTableId(),
+        cloneInfo.getTableName(), TableOperation.CLONE);
+    environment.getTableManager().cloneTable(cloneInfo.getSrcTableId(), 
cloneInfo.getTableId(),
+        cloneInfo.getTableName(), cloneInfo.getNamespaceId(), 
cloneInfo.getPropertiesToSet(),
+        cloneInfo.getPropertiesToExclude());
+    context.clearTableListCache();
 
-      return new CloneMetadata(cloneInfo);
-    } finally {
-      Utils.getTableNameLock().unlock();
-    }
+    return new CloneMetadata(cloneInfo);
   }
 
   @Override
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/namespace/create/PopulateZookeeperWithNamespace.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/namespace/create/PopulateZookeeperWithNamespace.java
index b1a91f6e17..654a117daa 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/namespace/create/PopulateZookeeperWithNamespace.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/namespace/create/PopulateZookeeperWithNamespace.java
@@ -48,22 +48,17 @@ class PopulateZookeeperWithNamespace extends ManagerRepo {
   @Override
   public Repo<Manager> call(FateId fateId, Manager manager) throws Exception {
 
-    Utils.getTableNameLock().lock();
-    try {
-      var context = manager.getContext();
-      context.getNamespaceMapping().put(namespaceInfo.namespaceId, 
namespaceInfo.namespaceName);
-      
context.getTableManager().prepareNewNamespaceState(namespaceInfo.namespaceId,
-          namespaceInfo.namespaceName, NodeExistsPolicy.OVERWRITE);
+    var context = manager.getContext();
+    context.getNamespaceMapping().put(namespaceInfo.namespaceId, 
namespaceInfo.namespaceName);
+    
context.getTableManager().prepareNewNamespaceState(namespaceInfo.namespaceId,
+        namespaceInfo.namespaceName, NodeExistsPolicy.OVERWRITE);
 
-      PropUtil.setProperties(context, 
NamespacePropKey.of(namespaceInfo.namespaceId),
-          namespaceInfo.props);
+    PropUtil.setProperties(context, 
NamespacePropKey.of(namespaceInfo.namespaceId),
+        namespaceInfo.props);
 
-      context.clearTableListCache();
+    context.clearTableListCache();
 
-      return new FinishCreateNamespace(namespaceInfo);
-    } finally {
-      Utils.getTableNameLock().unlock();
-    }
+    return new FinishCreateNamespace(namespaceInfo);
   }
 
   @Override
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/namespace/rename/RenameNamespace.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/namespace/rename/RenameNamespace.java
index ccb8669d1b..6e9161e39e 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/namespace/rename/RenameNamespace.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/namespace/rename/RenameNamespace.java
@@ -50,12 +50,10 @@ public class RenameNamespace extends ManagerRepo {
   @Override
   public Repo<Manager> call(FateId fateId, Manager manager) throws Exception {
 
-    Utils.getTableNameLock().lock();
     try {
       manager.getContext().getNamespaceMapping().rename(namespaceId, oldName, 
newName);
       manager.getContext().clearTableListCache();
     } finally {
-      Utils.getTableNameLock().unlock();
       Utils.unreserveNamespace(manager, namespaceId, fateId, LockType.WRITE);
     }
 
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/rename/RenameTable.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/rename/RenameTable.java
index 9e0fe1711f..e0c078958c 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/rename/RenameTable.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/rename/RenameTable.java
@@ -62,14 +62,12 @@ public class RenameTable extends ManagerRepo {
     String newSimpleTableName = 
TableNameUtil.qualify(newTableName).getSecond();
     var context = manager.getContext();
 
-    Utils.getTableNameLock().lock();
     try {
       context.getTableMapping(namespaceId).rename(tableId, 
qualifiedOldTableName.getSecond(),
           newSimpleTableName);
 
       context.clearTableListCache();
     } finally {
-      Utils.getTableNameLock().unlock();
       Utils.unreserveTable(manager, tableId, fateId, LockType.WRITE);
       Utils.unreserveNamespace(manager, namespaceId, fateId, LockType.READ);
     }
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/ImportPopulateZookeeper.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/ImportPopulateZookeeper.java
index d0a9f5037a..6ec3a13d44 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/ImportPopulateZookeeper.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/ImportPopulateZookeeper.java
@@ -71,20 +71,12 @@ class ImportPopulateZookeeper extends ManagerRepo {
   public Repo<Manager> call(FateId fateId, Manager env) throws Exception {
 
     var context = env.getContext();
-    // reserve the table name in zookeeper or fail
-    Utils.getTableNameLock().lock();
-
-    try {
-
-      // write tableName & tableId, first to Table Mapping and then to 
Zookeeper
-      context.getTableMapping(tableInfo.namespaceId).put(tableInfo.tableId, 
tableInfo.tableName,
-          TableOperation.IMPORT);
-      env.getTableManager().addTable(tableInfo.tableId, tableInfo.namespaceId, 
tableInfo.tableName);
+    // write tableName & tableId, first to Table Mapping and then to Zookeeper
+    context.getTableMapping(tableInfo.namespaceId).put(tableInfo.tableId, 
tableInfo.tableName,
+        TableOperation.IMPORT);
+    env.getTableManager().addTable(tableInfo.tableId, tableInfo.namespaceId, 
tableInfo.tableName);
 
-      context.clearTableListCache();
-    } finally {
-      Utils.getTableNameLock().unlock();
-    }
+    context.clearTableListCache();
 
     VolumeManager volMan = env.getVolumeManager();
 
diff --git a/test/src/main/java/org/apache/accumulo/test/TableOperationsIT.java 
b/test/src/main/java/org/apache/accumulo/test/TableOperationsIT.java
index 75730ed8b1..3e91e67b3e 100644
--- a/test/src/main/java/org/apache/accumulo/test/TableOperationsIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/TableOperationsIT.java
@@ -44,7 +44,6 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
 import org.apache.accumulo.core.client.Accumulo;
@@ -81,7 +80,6 @@ import org.apache.accumulo.harness.AccumuloClusterHarness;
 import org.apache.accumulo.manager.tableOps.Utils;
 import org.apache.accumulo.test.functional.BadIterator;
 import org.apache.accumulo.test.functional.FunctionalTestUtils;
-import org.apache.accumulo.test.util.Wait;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.io.Text;
 import org.junit.jupiter.api.AfterEach;
@@ -213,16 +211,14 @@ public class TableOperationsIT extends 
AccumuloClusterHarness {
     ExecutorService pool = Executors.newFixedThreadPool(numTasks);
 
     for (String tablename : getUniqueNames(30)) {
-      CountDownLatch startSignal = new CountDownLatch(1);
-      AtomicInteger numTasksRunning = new AtomicInteger(0);
-
-      List<Future<Boolean>> futureList = new ArrayList<>();
+      CountDownLatch startSignal = new CountDownLatch(numTasks);
+      List<Future<Boolean>> futureList = new ArrayList<>(numTasks);
 
       for (int j = 0; j < numTasks; j++) {
         Future<Boolean> future = pool.submit(() -> {
           boolean result;
           try {
-            numTasksRunning.incrementAndGet();
+            startSignal.countDown();
             startSignal.await();
             accumuloClient.tableOperations().create(tablename);
             result = true;
@@ -234,10 +230,6 @@ public class TableOperationsIT extends 
AccumuloClusterHarness {
         futureList.add(future);
       }
 
-      Wait.waitFor(() -> numTasksRunning.get() == numTasks);
-
-      startSignal.countDown();
-
       int taskSucceeded = 0;
       int taskFailed = 0;
       for (Future<Boolean> result : futureList) {
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/ConcurrentTableNameOperationsIT.java
 
b/test/src/main/java/org/apache/accumulo/test/functional/ConcurrentTableNameOperationsIT.java
new file mode 100644
index 0000000000..9dc5b70984
--- /dev/null
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/ConcurrentTableNameOperationsIT.java
@@ -0,0 +1,426 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.NamespaceExistsException;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.metadata.SystemTables;
+import org.apache.accumulo.harness.SharedMiniClusterBase;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Ensure that concurrent table and namespace operations that target the same 
name are handled
+ * correctly.
+ */
+public class ConcurrentTableNameOperationsIT extends SharedMiniClusterBase {
+
+  static AccumuloClient client;
+
+  @Override
+  protected Duration defaultTimeout() {
+    return Duration.ofMinutes(3);
+  }
+
+  @BeforeAll
+  public static void setup() throws Exception {
+    SharedMiniClusterBase.startMiniCluster();
+    client = Accumulo.newClient().from(getClientProps()).build();
+  }
+
+  @AfterEach
+  public void cleanUpTables() throws Exception {
+    for (String table : client.tableOperations().list()) {
+      if (!SystemTables.containsTableName(table)) {
+        client.tableOperations().delete(table);
+      }
+    }
+  }
+
+  @AfterAll
+  public static void teardown() {
+    client.close();
+    SharedMiniClusterBase.stopMiniCluster();
+  }
+
+  /**
+   * Test concurrent cloning of tables with the same target name.
+   */
+  @Test
+  public void cloneTable() throws Exception {
+    final int numTasks = 16;
+    final int numIterations = 8;
+    ExecutorService pool = Executors.newFixedThreadPool(numTasks);
+
+    for (String targetTableName : getUniqueNames(numIterations)) {
+      List<String> sourceTableNames = new ArrayList<>();
+      for (int i = 0; i < numTasks; i++) {
+        String sourceTable = targetTableName + "_source_" + i;
+        client.tableOperations().create(sourceTable);
+        sourceTableNames.add(sourceTable);
+      }
+
+      int tableCountBefore = client.tableOperations().list().size();
+
+      int successCount = runConcurrentTableOperation(pool, numTasks, (index) 
-> {
+        client.tableOperations().clone(sourceTableNames.get(index), 
targetTableName, true, Map.of(),
+            Set.of());
+        return true;
+      });
+
+      assertEquals(1, successCount, "Expected only one clone operation to 
succeed");
+      assertTrue(client.tableOperations().exists(targetTableName),
+          "Expected target table " + targetTableName + " to exist");
+      assertEquals(tableCountBefore + 1, 
client.tableOperations().list().size(),
+          "Expected only one new table after clone");
+    }
+
+    pool.shutdown();
+  }
+
+  /**
+   * Test concurrent renaming of tables to the same target name.
+   */
+  @Test
+  public void renameTable() throws Exception {
+    final int numTasks = 16;
+    final int numIterations = 10;
+    ExecutorService pool = Executors.newFixedThreadPool(numTasks);
+
+    for (String targetTableName : getUniqueNames(numIterations)) {
+      List<String> sourceTableNames = new ArrayList<>();
+      for (int i = 0; i < numTasks; i++) {
+        String sourceTable = targetTableName + "_rename_source_" + i;
+        client.tableOperations().create(sourceTable);
+        sourceTableNames.add(sourceTable);
+      }
+
+      int tableCountBefore = client.tableOperations().list().size();
+
+      int successCount = runConcurrentTableOperation(pool, numTasks, (index) 
-> {
+        client.tableOperations().rename(sourceTableNames.get(index), 
targetTableName);
+        return true;
+      });
+
+      assertEquals(1, successCount, "Expected only one rename operation to 
succeed");
+      assertTrue(client.tableOperations().exists(targetTableName),
+          "Expected target table " + targetTableName + " to exist");
+      assertEquals(tableCountBefore, client.tableOperations().list().size());
+    }
+
+    pool.shutdown();
+  }
+
+  /**
+   * Test that when several threads attempt to import to the same table name 
simultaneously, only
+   * one import succeeds.
+   */
+  @Test
+  public void importTable() throws Exception {
+    final int numTasks = 16;
+    final int numIterations = 4;
+    ExecutorService pool = Executors.newFixedThreadPool(numTasks);
+    String[] targetTableNames = getUniqueNames(numIterations);
+    var ntc = new NewTableConfiguration().createOffline();
+
+    for (String importTableName : targetTableNames) {
+      // Create separate source tables and export directories for each thread
+      List<String> exportDirs = new ArrayList<>(numTasks);
+      for (int i = 0; i < numTasks; i++) {
+        String sourceTableName = importTableName + "_export_source_" + i;
+        client.tableOperations().create(sourceTableName, ntc);
+        String exportDir = getCluster().getTemporaryPath() + "/export_" + 
sourceTableName;
+        client.tableOperations().exportTable(sourceTableName, exportDir);
+        exportDirs.add(exportDir);
+      }
+
+      int tableCountBefore = client.tableOperations().list().size();
+
+      // All threads attempt to import to the same target table name
+      int successCount = runConcurrentTableOperation(pool, numTasks, (index) 
-> {
+        client.tableOperations().importTable(importTableName, 
exportDirs.get(index));
+        return true;
+      });
+
+      assertEquals(1, successCount, "Expected only one import operation to 
succeed");
+      assertTrue(client.tableOperations().exists(importTableName),
+          "Expected import table " + importTableName + " to exist");
+      assertEquals(tableCountBefore + 1, 
client.tableOperations().list().size(),
+          "Expected +1 table count for import operation");
+    }
+
+    pool.shutdown();
+  }
+
+  /**
+   * Test that when several operations all target the same table name, only 
one operation
+   * successfully creates that table.
+   */
+  @Test
+  public void mixedTableOperations() throws Exception {
+    final int operationsPerType = 10;
+    final int numTasks = operationsPerType * 3;
+    final int numIterations = 4;
+    ExecutorService pool = Executors.newFixedThreadPool(numTasks);
+    String[] expectedTableNames = getUniqueNames(numIterations);
+
+    for (String targetTableName : expectedTableNames) {
+      List<String> cloneSourceTables = new ArrayList<>();
+      List<String> renameSourceTables = new ArrayList<>();
+      for (int i = 0; i < operationsPerType; i++) {
+        String cloneSource = targetTableName + "_clone_src_" + i;
+        client.tableOperations().create(cloneSource);
+        cloneSourceTables.add(cloneSource);
+
+        String renameSource = targetTableName + "_rename_src_" + i;
+        client.tableOperations().create(renameSource);
+        renameSourceTables.add(renameSource);
+      }
+
+      int tableCountBefore = client.tableOperations().list().size();
+
+      List<Future<Boolean>> futures = new ArrayList<>();
+      CountDownLatch startSignal = new CountDownLatch(numTasks);
+      AtomicReference<String> successfulOperation = new AtomicReference<>();
+
+      for (int i = 0; i < operationsPerType; i++) {
+        futures.add(pool.submit(() -> {
+          try {
+            startSignal.countDown();
+            startSignal.await();
+            client.tableOperations().create(targetTableName);
+            successfulOperation.set("create");
+            return true;
+          } catch (TableExistsException e) {
+            return false;
+          }
+        }));
+
+        final int index = i;
+
+        futures.add(pool.submit(() -> {
+          try {
+            startSignal.countDown();
+            startSignal.await();
+            client.tableOperations().rename(renameSourceTables.get(index), 
targetTableName);
+            successfulOperation.set("rename");
+            return true;
+          } catch (TableExistsException e) {
+            return false;
+          }
+        }));
+
+        futures.add(pool.submit(() -> {
+          try {
+            startSignal.countDown();
+            startSignal.await();
+            client.tableOperations().clone(cloneSourceTables.get(index), 
targetTableName, true,
+                Map.of(), Set.of());
+            successfulOperation.set("clone");
+            return true;
+          } catch (TableExistsException e) {
+            return false;
+          }
+        }));
+      }
+
+      assertEquals(numTasks, futures.size(),
+          "Actual created task count did not match expected count");
+
+      int successCount = 0;
+      for (Future<Boolean> future : futures) {
+        if (future.get()) {
+          successCount++;
+        }
+      }
+
+      assertEquals(1, successCount, "Expected only one operation to succeed");
+
+      int tableCountAfter = client.tableOperations().list().size();
+      assertTrue(client.tableOperations().exists(targetTableName),
+          "Expected target table " + targetTableName + " to exist");
+
+      String operation = successfulOperation.get();
+      if ("create".equals(operation) || "clone".equals(operation)) {
+        assertEquals(tableCountBefore + 1, tableCountAfter,
+            "Expected +1 table count for " + operation);
+      } else if ("rename".equals(operation)) {
+        assertEquals(tableCountBefore, tableCountAfter, "Expected same table 
count for rename");
+      }
+    }
+
+    pool.shutdown();
+  }
+
+  /**
+   * Test that when several threads attempt to create the same namespace 
simultaneously, only one
+   * creation succeeds.
+   */
+  @Test
+  public void createNamespace() throws Exception {
+    final int numTasks = 16;
+    final int numIterations = 16;
+    ExecutorService pool = Executors.newFixedThreadPool(numTasks);
+    String[] targetNamespaceNames = getUniqueNames(numIterations);
+
+    for (String namespaceName : targetNamespaceNames) {
+      Set<String> namespacesBefore = client.namespaceOperations().list();
+
+      int successCount = runConcurrentNamespaceOperation(pool, numTasks, 
(index) -> {
+        client.namespaceOperations().create(namespaceName);
+        return true;
+      });
+
+      assertEquals(1, successCount, "Expected only one create operation to 
succeed");
+      assertTrue(client.namespaceOperations().exists(namespaceName),
+          "Expected namespace " + namespaceName + " to exist");
+
+      Set<String> namespacesAfter = client.namespaceOperations().list();
+      Set<String> newNamespaces = new HashSet<>(namespacesAfter);
+      newNamespaces.removeAll(namespacesBefore);
+      assertEquals(Set.of(namespaceName), newNamespaces,
+          "Expected exactly one new namespace: " + namespaceName);
+
+      client.namespaceOperations().delete(namespaceName);
+    }
+
+    pool.shutdown();
+  }
+
+  /**
+   * Test that when several threads attempt to rename to the same namespace 
name simultaneously,
+   * only one rename succeeds.
+   */
+  @Test
+  public void renameNamespace() throws Exception {
+    final int numTasks = 16;
+    final int numIterations = 8;
+    ExecutorService pool = Executors.newFixedThreadPool(numTasks);
+    String[] targetNamespaceNames = getUniqueNames(numIterations);
+
+    for (String targetNamespaceName : targetNamespaceNames) {
+      // multiple source namespaces for rename ops
+      List<String> sourceNamespaces = new ArrayList<>();
+      for (int i = 0; i < numTasks; i++) {
+        String sourceNamespace = targetNamespaceName + "_source_" + i;
+        client.namespaceOperations().create(sourceNamespace);
+        sourceNamespaces.add(sourceNamespace);
+      }
+
+      Set<String> namespacesBefore = client.namespaceOperations().list();
+
+      int successCount = runConcurrentNamespaceOperation(pool, numTasks, 
(index) -> {
+        client.namespaceOperations().rename(sourceNamespaces.get(index), 
targetNamespaceName);
+        return true;
+      });
+
+      assertEquals(1, successCount, "Expected only one rename operation to 
succeed");
+      assertTrue(client.namespaceOperations().exists(targetNamespaceName),
+          "Expected target namespace " + targetNamespaceName + " to exist");
+
+      Set<String> namespacesAfter = client.namespaceOperations().list();
+      assertEquals(namespacesBefore.size(), namespacesAfter.size(),
+          "Expected same namespace count (rename operation)");
+      assertTrue(namespacesAfter.contains(targetNamespaceName),
+          "Expected target namespace in final list");
+
+      for (String sourceNamespace : sourceNamespaces) {
+        if (client.namespaceOperations().exists(sourceNamespace)) {
+          client.namespaceOperations().delete(sourceNamespace);
+        }
+      }
+      if (client.namespaceOperations().exists(targetNamespaceName)) {
+        client.namespaceOperations().delete(targetNamespaceName);
+      }
+    }
+
+    pool.shutdown();
+  }
+
+  private int runConcurrentTableOperation(ExecutorService pool, int numTasks,
+      ConcurrentOperation operation) throws ExecutionException, 
InterruptedException {
+    return runConcurrentOperation(pool, numTasks, operation, 
TableExistsException.class);
+  }
+
+  private int runConcurrentNamespaceOperation(ExecutorService pool, int 
numTasks,
+      ConcurrentOperation operation) throws ExecutionException, 
InterruptedException {
+    return runConcurrentOperation(pool, numTasks, operation, 
NamespaceExistsException.class);
+  }
+
+  private int runConcurrentOperation(ExecutorService pool, int numTasks,
+      ConcurrentOperation operation, Class<? extends Exception> 
expectedExceptionType)
+      throws ExecutionException, InterruptedException {
+    CountDownLatch startSignal = new CountDownLatch(numTasks);
+    List<Future<Boolean>> futures = new ArrayList<>(numTasks);
+
+    for (int i = 0; i < numTasks; i++) {
+      final int index = i;
+      futures.add(pool.submit(() -> {
+        try {
+          startSignal.countDown();
+          startSignal.await();
+          return operation.execute(index);
+        } catch (Exception e) {
+          if (expectedExceptionType.isInstance(e)) {
+            return false;
+          }
+          throw new RuntimeException(e);
+        }
+      }));
+    }
+
+    assertEquals(numTasks, futures.size(),
+        "Actual created task count did not match expected count");
+
+    int successCount = 0;
+    for (Future<Boolean> future : futures) {
+      if (future.get()) {
+        successCount++;
+      }
+    }
+
+    return successCount;
+  }
+
+  @FunctionalInterface
+  private interface ConcurrentOperation {
+    boolean execute(int index) throws Exception;
+  }
+}

Reply via email to