This is an automated email from the ASF dual-hosted git repository.
domgarguilo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new 78cb5207c2 Remove usage of Utils.getTableNameLock() & add concurrency
test cases (#5822)
78cb5207c2 is described below
commit 78cb5207c29bbc8ac3e8068f0a64c03d59ad007f
Author: Dom G. <[email protected]>
AuthorDate: Fri Aug 29 12:18:31 2025 -0400
Remove usage of Utils.getTableNameLock() & add concurrency test cases
(#5822)
* Remove Utils.getTableNameLock() and all places it was used
* Add test cases with concurrent operations to ensure the places that the
locks were removed handle things properly without the locks
---
.../apache/accumulo/manager/tableOps/Utils.java | 8 -
.../manager/tableOps/clone/CloneZookeeper.java | 23 +-
.../create/PopulateZookeeperWithNamespace.java | 21 +-
.../tableOps/namespace/rename/RenameNamespace.java | 2 -
.../manager/tableOps/rename/RenameTable.java | 2 -
.../tableImport/ImportPopulateZookeeper.java | 18 +-
.../apache/accumulo/test/TableOperationsIT.java | 14 +-
.../ConcurrentTableNameOperationsIT.java | 426 +++++++++++++++++++++
8 files changed, 451 insertions(+), 63 deletions(-)
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/Utils.java
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/Utils.java
index a88afc6ee4..60eb2c4d74 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/Utils.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/Utils.java
@@ -29,8 +29,6 @@ import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import org.apache.accumulo.core.Constants;
@@ -89,8 +87,6 @@ public class Utils {
}
}
- static final Lock tableNameLock = new ReentrantLock();
-
private static KeyExtent findContaining(Ample ample, TableId tableId, Text
row) {
Objects.requireNonNull(row);
try (var tablets = ample.readTablets().forTable(tableId).overlapping(row,
true, row)
@@ -315,10 +311,6 @@ public class Utils {
return lock;
}
- public static Lock getTableNameLock() {
- return tableNameLock;
- }
-
public static DistributedLock getReadLock(Manager env, AbstractId<?> id,
FateId fateId,
LockRange range) {
return Utils.getLock(env.getContext(), id, fateId, LockType.READ, range);
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/clone/CloneZookeeper.java
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/clone/CloneZookeeper.java
index b3d47e5f1c..40e17990f5 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/clone/CloneZookeeper.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/clone/CloneZookeeper.java
@@ -53,21 +53,16 @@ class CloneZookeeper extends ManagerRepo {
@Override
public Repo<Manager> call(FateId fateId, Manager environment) throws
Exception {
- Utils.getTableNameLock().lock();
- try {
- var context = environment.getContext();
- // write tableName & tableId, first to Table Mapping and then to
Zookeeper
-
context.getTableMapping(cloneInfo.getNamespaceId()).put(cloneInfo.getTableId(),
- cloneInfo.getTableName(), TableOperation.CLONE);
- environment.getTableManager().cloneTable(cloneInfo.getSrcTableId(),
cloneInfo.getTableId(),
- cloneInfo.getTableName(), cloneInfo.getNamespaceId(),
cloneInfo.getPropertiesToSet(),
- cloneInfo.getPropertiesToExclude());
- context.clearTableListCache();
+ var context = environment.getContext();
+ // write tableName & tableId, first to Table Mapping and then to Zookeeper
+
context.getTableMapping(cloneInfo.getNamespaceId()).put(cloneInfo.getTableId(),
+ cloneInfo.getTableName(), TableOperation.CLONE);
+ environment.getTableManager().cloneTable(cloneInfo.getSrcTableId(),
cloneInfo.getTableId(),
+ cloneInfo.getTableName(), cloneInfo.getNamespaceId(),
cloneInfo.getPropertiesToSet(),
+ cloneInfo.getPropertiesToExclude());
+ context.clearTableListCache();
- return new CloneMetadata(cloneInfo);
- } finally {
- Utils.getTableNameLock().unlock();
- }
+ return new CloneMetadata(cloneInfo);
}
@Override
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/namespace/create/PopulateZookeeperWithNamespace.java
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/namespace/create/PopulateZookeeperWithNamespace.java
index b1a91f6e17..654a117daa 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/namespace/create/PopulateZookeeperWithNamespace.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/namespace/create/PopulateZookeeperWithNamespace.java
@@ -48,22 +48,17 @@ class PopulateZookeeperWithNamespace extends ManagerRepo {
@Override
public Repo<Manager> call(FateId fateId, Manager manager) throws Exception {
- Utils.getTableNameLock().lock();
- try {
- var context = manager.getContext();
- context.getNamespaceMapping().put(namespaceInfo.namespaceId,
namespaceInfo.namespaceName);
-
context.getTableManager().prepareNewNamespaceState(namespaceInfo.namespaceId,
- namespaceInfo.namespaceName, NodeExistsPolicy.OVERWRITE);
+ var context = manager.getContext();
+ context.getNamespaceMapping().put(namespaceInfo.namespaceId,
namespaceInfo.namespaceName);
+
context.getTableManager().prepareNewNamespaceState(namespaceInfo.namespaceId,
+ namespaceInfo.namespaceName, NodeExistsPolicy.OVERWRITE);
- PropUtil.setProperties(context,
NamespacePropKey.of(namespaceInfo.namespaceId),
- namespaceInfo.props);
+ PropUtil.setProperties(context,
NamespacePropKey.of(namespaceInfo.namespaceId),
+ namespaceInfo.props);
- context.clearTableListCache();
+ context.clearTableListCache();
- return new FinishCreateNamespace(namespaceInfo);
- } finally {
- Utils.getTableNameLock().unlock();
- }
+ return new FinishCreateNamespace(namespaceInfo);
}
@Override
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/namespace/rename/RenameNamespace.java
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/namespace/rename/RenameNamespace.java
index ccb8669d1b..6e9161e39e 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/namespace/rename/RenameNamespace.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/namespace/rename/RenameNamespace.java
@@ -50,12 +50,10 @@ public class RenameNamespace extends ManagerRepo {
@Override
public Repo<Manager> call(FateId fateId, Manager manager) throws Exception {
- Utils.getTableNameLock().lock();
try {
manager.getContext().getNamespaceMapping().rename(namespaceId, oldName,
newName);
manager.getContext().clearTableListCache();
} finally {
- Utils.getTableNameLock().unlock();
Utils.unreserveNamespace(manager, namespaceId, fateId, LockType.WRITE);
}
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/rename/RenameTable.java
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/rename/RenameTable.java
index 9e0fe1711f..e0c078958c 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/rename/RenameTable.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/rename/RenameTable.java
@@ -62,14 +62,12 @@ public class RenameTable extends ManagerRepo {
String newSimpleTableName =
TableNameUtil.qualify(newTableName).getSecond();
var context = manager.getContext();
- Utils.getTableNameLock().lock();
try {
context.getTableMapping(namespaceId).rename(tableId,
qualifiedOldTableName.getSecond(),
newSimpleTableName);
context.clearTableListCache();
} finally {
- Utils.getTableNameLock().unlock();
Utils.unreserveTable(manager, tableId, fateId, LockType.WRITE);
Utils.unreserveNamespace(manager, namespaceId, fateId, LockType.READ);
}
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/ImportPopulateZookeeper.java
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/ImportPopulateZookeeper.java
index d0a9f5037a..6ec3a13d44 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/ImportPopulateZookeeper.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/ImportPopulateZookeeper.java
@@ -71,20 +71,12 @@ class ImportPopulateZookeeper extends ManagerRepo {
public Repo<Manager> call(FateId fateId, Manager env) throws Exception {
var context = env.getContext();
- // reserve the table name in zookeeper or fail
- Utils.getTableNameLock().lock();
-
- try {
-
- // write tableName & tableId, first to Table Mapping and then to
Zookeeper
- context.getTableMapping(tableInfo.namespaceId).put(tableInfo.tableId,
tableInfo.tableName,
- TableOperation.IMPORT);
- env.getTableManager().addTable(tableInfo.tableId, tableInfo.namespaceId,
tableInfo.tableName);
+ // write tableName & tableId, first to Table Mapping and then to Zookeeper
+ context.getTableMapping(tableInfo.namespaceId).put(tableInfo.tableId,
tableInfo.tableName,
+ TableOperation.IMPORT);
+ env.getTableManager().addTable(tableInfo.tableId, tableInfo.namespaceId,
tableInfo.tableName);
- context.clearTableListCache();
- } finally {
- Utils.getTableNameLock().unlock();
- }
+ context.clearTableListCache();
VolumeManager volMan = env.getVolumeManager();
diff --git a/test/src/main/java/org/apache/accumulo/test/TableOperationsIT.java
b/test/src/main/java/org/apache/accumulo/test/TableOperationsIT.java
index 75730ed8b1..3e91e67b3e 100644
--- a/test/src/main/java/org/apache/accumulo/test/TableOperationsIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/TableOperationsIT.java
@@ -44,7 +44,6 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.accumulo.core.client.Accumulo;
@@ -81,7 +80,6 @@ import org.apache.accumulo.harness.AccumuloClusterHarness;
import org.apache.accumulo.manager.tableOps.Utils;
import org.apache.accumulo.test.functional.BadIterator;
import org.apache.accumulo.test.functional.FunctionalTestUtils;
-import org.apache.accumulo.test.util.Wait;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.io.Text;
import org.junit.jupiter.api.AfterEach;
@@ -213,16 +211,14 @@ public class TableOperationsIT extends
AccumuloClusterHarness {
ExecutorService pool = Executors.newFixedThreadPool(numTasks);
for (String tablename : getUniqueNames(30)) {
- CountDownLatch startSignal = new CountDownLatch(1);
- AtomicInteger numTasksRunning = new AtomicInteger(0);
-
- List<Future<Boolean>> futureList = new ArrayList<>();
+ CountDownLatch startSignal = new CountDownLatch(numTasks);
+ List<Future<Boolean>> futureList = new ArrayList<>(numTasks);
for (int j = 0; j < numTasks; j++) {
Future<Boolean> future = pool.submit(() -> {
boolean result;
try {
- numTasksRunning.incrementAndGet();
+ startSignal.countDown();
startSignal.await();
accumuloClient.tableOperations().create(tablename);
result = true;
@@ -234,10 +230,6 @@ public class TableOperationsIT extends
AccumuloClusterHarness {
futureList.add(future);
}
- Wait.waitFor(() -> numTasksRunning.get() == numTasks);
-
- startSignal.countDown();
-
int taskSucceeded = 0;
int taskFailed = 0;
for (Future<Boolean> result : futureList) {
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/ConcurrentTableNameOperationsIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/ConcurrentTableNameOperationsIT.java
new file mode 100644
index 0000000000..9dc5b70984
--- /dev/null
+++
b/test/src/main/java/org/apache/accumulo/test/functional/ConcurrentTableNameOperationsIT.java
@@ -0,0 +1,426 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.NamespaceExistsException;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.metadata.SystemTables;
+import org.apache.accumulo.harness.SharedMiniClusterBase;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Ensure that concurrent table and namespace operations that target the same
name are handled
+ * correctly.
+ */
+public class ConcurrentTableNameOperationsIT extends SharedMiniClusterBase {
+
+ static AccumuloClient client;
+
+ @Override
+ protected Duration defaultTimeout() {
+ return Duration.ofMinutes(3);
+ }
+
+ @BeforeAll
+ public static void setup() throws Exception {
+ SharedMiniClusterBase.startMiniCluster();
+ client = Accumulo.newClient().from(getClientProps()).build();
+ }
+
+ @AfterEach
+ public void cleanUpTables() throws Exception {
+ for (String table : client.tableOperations().list()) {
+ if (!SystemTables.containsTableName(table)) {
+ client.tableOperations().delete(table);
+ }
+ }
+ }
+
+ @AfterAll
+ public static void teardown() {
+ client.close();
+ SharedMiniClusterBase.stopMiniCluster();
+ }
+
+ /**
+ * Test concurrent cloning of tables with the same target name.
+ */
+ @Test
+ public void cloneTable() throws Exception {
+ final int numTasks = 16;
+ final int numIterations = 8;
+ ExecutorService pool = Executors.newFixedThreadPool(numTasks);
+
+ for (String targetTableName : getUniqueNames(numIterations)) {
+ List<String> sourceTableNames = new ArrayList<>();
+ for (int i = 0; i < numTasks; i++) {
+ String sourceTable = targetTableName + "_source_" + i;
+ client.tableOperations().create(sourceTable);
+ sourceTableNames.add(sourceTable);
+ }
+
+ int tableCountBefore = client.tableOperations().list().size();
+
+ int successCount = runConcurrentTableOperation(pool, numTasks, (index)
-> {
+ client.tableOperations().clone(sourceTableNames.get(index),
targetTableName, true, Map.of(),
+ Set.of());
+ return true;
+ });
+
+ assertEquals(1, successCount, "Expected only one clone operation to
succeed");
+ assertTrue(client.tableOperations().exists(targetTableName),
+ "Expected target table " + targetTableName + " to exist");
+ assertEquals(tableCountBefore + 1,
client.tableOperations().list().size(),
+ "Expected only one new table after clone");
+ }
+
+ pool.shutdown();
+ }
+
+ /**
+ * Test concurrent renaming of tables to the same target name.
+ */
+ @Test
+ public void renameTable() throws Exception {
+ final int numTasks = 16;
+ final int numIterations = 10;
+ ExecutorService pool = Executors.newFixedThreadPool(numTasks);
+
+ for (String targetTableName : getUniqueNames(numIterations)) {
+ List<String> sourceTableNames = new ArrayList<>();
+ for (int i = 0; i < numTasks; i++) {
+ String sourceTable = targetTableName + "_rename_source_" + i;
+ client.tableOperations().create(sourceTable);
+ sourceTableNames.add(sourceTable);
+ }
+
+ int tableCountBefore = client.tableOperations().list().size();
+
+ int successCount = runConcurrentTableOperation(pool, numTasks, (index)
-> {
+ client.tableOperations().rename(sourceTableNames.get(index),
targetTableName);
+ return true;
+ });
+
+ assertEquals(1, successCount, "Expected only one rename operation to
succeed");
+ assertTrue(client.tableOperations().exists(targetTableName),
+ "Expected target table " + targetTableName + " to exist");
+ assertEquals(tableCountBefore, client.tableOperations().list().size());
+ }
+
+ pool.shutdown();
+ }
+
+ /**
+ * Test that when several threads attempt to import to the same table name
simultaneously, only
+ * one import succeeds.
+ */
+ @Test
+ public void importTable() throws Exception {
+ final int numTasks = 16;
+ final int numIterations = 4;
+ ExecutorService pool = Executors.newFixedThreadPool(numTasks);
+ String[] targetTableNames = getUniqueNames(numIterations);
+ var ntc = new NewTableConfiguration().createOffline();
+
+ for (String importTableName : targetTableNames) {
+ // Create separate source tables and export directories for each thread
+ List<String> exportDirs = new ArrayList<>(numTasks);
+ for (int i = 0; i < numTasks; i++) {
+ String sourceTableName = importTableName + "_export_source_" + i;
+ client.tableOperations().create(sourceTableName, ntc);
+ String exportDir = getCluster().getTemporaryPath() + "/export_" +
sourceTableName;
+ client.tableOperations().exportTable(sourceTableName, exportDir);
+ exportDirs.add(exportDir);
+ }
+
+ int tableCountBefore = client.tableOperations().list().size();
+
+ // All threads attempt to import to the same target table name
+ int successCount = runConcurrentTableOperation(pool, numTasks, (index)
-> {
+ client.tableOperations().importTable(importTableName,
exportDirs.get(index));
+ return true;
+ });
+
+ assertEquals(1, successCount, "Expected only one import operation to
succeed");
+ assertTrue(client.tableOperations().exists(importTableName),
+ "Expected import table " + importTableName + " to exist");
+ assertEquals(tableCountBefore + 1,
client.tableOperations().list().size(),
+ "Expected +1 table count for import operation");
+ }
+
+ pool.shutdown();
+ }
+
+ /**
+ * Test that when several operations all target the same table name, only
one operation
+ * successfully creates that table.
+ */
+ @Test
+ public void mixedTableOperations() throws Exception {
+ final int operationsPerType = 10;
+ final int numTasks = operationsPerType * 3;
+ final int numIterations = 4;
+ ExecutorService pool = Executors.newFixedThreadPool(numTasks);
+ String[] expectedTableNames = getUniqueNames(numIterations);
+
+ for (String targetTableName : expectedTableNames) {
+ List<String> cloneSourceTables = new ArrayList<>();
+ List<String> renameSourceTables = new ArrayList<>();
+ for (int i = 0; i < operationsPerType; i++) {
+ String cloneSource = targetTableName + "_clone_src_" + i;
+ client.tableOperations().create(cloneSource);
+ cloneSourceTables.add(cloneSource);
+
+ String renameSource = targetTableName + "_rename_src_" + i;
+ client.tableOperations().create(renameSource);
+ renameSourceTables.add(renameSource);
+ }
+
+ int tableCountBefore = client.tableOperations().list().size();
+
+ List<Future<Boolean>> futures = new ArrayList<>();
+ CountDownLatch startSignal = new CountDownLatch(numTasks);
+ AtomicReference<String> successfulOperation = new AtomicReference<>();
+
+ for (int i = 0; i < operationsPerType; i++) {
+ futures.add(pool.submit(() -> {
+ try {
+ startSignal.countDown();
+ startSignal.await();
+ client.tableOperations().create(targetTableName);
+ successfulOperation.set("create");
+ return true;
+ } catch (TableExistsException e) {
+ return false;
+ }
+ }));
+
+ final int index = i;
+
+ futures.add(pool.submit(() -> {
+ try {
+ startSignal.countDown();
+ startSignal.await();
+ client.tableOperations().rename(renameSourceTables.get(index),
targetTableName);
+ successfulOperation.set("rename");
+ return true;
+ } catch (TableExistsException e) {
+ return false;
+ }
+ }));
+
+ futures.add(pool.submit(() -> {
+ try {
+ startSignal.countDown();
+ startSignal.await();
+ client.tableOperations().clone(cloneSourceTables.get(index),
targetTableName, true,
+ Map.of(), Set.of());
+ successfulOperation.set("clone");
+ return true;
+ } catch (TableExistsException e) {
+ return false;
+ }
+ }));
+ }
+
+ assertEquals(numTasks, futures.size(),
+ "Actual created task count did not match expected count");
+
+ int successCount = 0;
+ for (Future<Boolean> future : futures) {
+ if (future.get()) {
+ successCount++;
+ }
+ }
+
+ assertEquals(1, successCount, "Expected only one operation to succeed");
+
+ int tableCountAfter = client.tableOperations().list().size();
+ assertTrue(client.tableOperations().exists(targetTableName),
+ "Expected target table " + targetTableName + " to exist");
+
+ String operation = successfulOperation.get();
+ if ("create".equals(operation) || "clone".equals(operation)) {
+ assertEquals(tableCountBefore + 1, tableCountAfter,
+ "Expected +1 table count for " + operation);
+ } else if ("rename".equals(operation)) {
+ assertEquals(tableCountBefore, tableCountAfter, "Expected same table
count for rename");
+ }
+ }
+
+ pool.shutdown();
+ }
+
+ /**
+ * Test that when several threads attempt to create the same namespace
simultaneously, only one
+ * creation succeeds.
+ */
+ @Test
+ public void createNamespace() throws Exception {
+ final int numTasks = 16;
+ final int numIterations = 16;
+ ExecutorService pool = Executors.newFixedThreadPool(numTasks);
+ String[] targetNamespaceNames = getUniqueNames(numIterations);
+
+ for (String namespaceName : targetNamespaceNames) {
+ Set<String> namespacesBefore = client.namespaceOperations().list();
+
+ int successCount = runConcurrentNamespaceOperation(pool, numTasks,
(index) -> {
+ client.namespaceOperations().create(namespaceName);
+ return true;
+ });
+
+ assertEquals(1, successCount, "Expected only one create operation to
succeed");
+ assertTrue(client.namespaceOperations().exists(namespaceName),
+ "Expected namespace " + namespaceName + " to exist");
+
+ Set<String> namespacesAfter = client.namespaceOperations().list();
+ Set<String> newNamespaces = new HashSet<>(namespacesAfter);
+ newNamespaces.removeAll(namespacesBefore);
+ assertEquals(Set.of(namespaceName), newNamespaces,
+ "Expected exactly one new namespace: " + namespaceName);
+
+ client.namespaceOperations().delete(namespaceName);
+ }
+
+ pool.shutdown();
+ }
+
+ /**
+ * Test that when several threads attempt to rename to the same namespace
name simultaneously,
+ * only one rename succeeds.
+ */
+ @Test
+ public void renameNamespace() throws Exception {
+ final int numTasks = 16;
+ final int numIterations = 8;
+ ExecutorService pool = Executors.newFixedThreadPool(numTasks);
+ String[] targetNamespaceNames = getUniqueNames(numIterations);
+
+ for (String targetNamespaceName : targetNamespaceNames) {
+ // multiple source namespaces for rename ops
+ List<String> sourceNamespaces = new ArrayList<>();
+ for (int i = 0; i < numTasks; i++) {
+ String sourceNamespace = targetNamespaceName + "_source_" + i;
+ client.namespaceOperations().create(sourceNamespace);
+ sourceNamespaces.add(sourceNamespace);
+ }
+
+ Set<String> namespacesBefore = client.namespaceOperations().list();
+
+ int successCount = runConcurrentNamespaceOperation(pool, numTasks,
(index) -> {
+ client.namespaceOperations().rename(sourceNamespaces.get(index),
targetNamespaceName);
+ return true;
+ });
+
+ assertEquals(1, successCount, "Expected only one rename operation to
succeed");
+ assertTrue(client.namespaceOperations().exists(targetNamespaceName),
+ "Expected target namespace " + targetNamespaceName + " to exist");
+
+ Set<String> namespacesAfter = client.namespaceOperations().list();
+ assertEquals(namespacesBefore.size(), namespacesAfter.size(),
+ "Expected same namespace count (rename operation)");
+ assertTrue(namespacesAfter.contains(targetNamespaceName),
+ "Expected target namespace in final list");
+
+ for (String sourceNamespace : sourceNamespaces) {
+ if (client.namespaceOperations().exists(sourceNamespace)) {
+ client.namespaceOperations().delete(sourceNamespace);
+ }
+ }
+ if (client.namespaceOperations().exists(targetNamespaceName)) {
+ client.namespaceOperations().delete(targetNamespaceName);
+ }
+ }
+
+ pool.shutdown();
+ }
+
+ private int runConcurrentTableOperation(ExecutorService pool, int numTasks,
+ ConcurrentOperation operation) throws ExecutionException,
InterruptedException {
+ return runConcurrentOperation(pool, numTasks, operation,
TableExistsException.class);
+ }
+
+ private int runConcurrentNamespaceOperation(ExecutorService pool, int
numTasks,
+ ConcurrentOperation operation) throws ExecutionException,
InterruptedException {
+ return runConcurrentOperation(pool, numTasks, operation,
NamespaceExistsException.class);
+ }
+
+ private int runConcurrentOperation(ExecutorService pool, int numTasks,
+ ConcurrentOperation operation, Class<? extends Exception>
expectedExceptionType)
+ throws ExecutionException, InterruptedException {
+ CountDownLatch startSignal = new CountDownLatch(numTasks);
+ List<Future<Boolean>> futures = new ArrayList<>(numTasks);
+
+ for (int i = 0; i < numTasks; i++) {
+ final int index = i;
+ futures.add(pool.submit(() -> {
+ try {
+ startSignal.countDown();
+ startSignal.await();
+ return operation.execute(index);
+ } catch (Exception e) {
+ if (expectedExceptionType.isInstance(e)) {
+ return false;
+ }
+ throw new RuntimeException(e);
+ }
+ }));
+ }
+
+ assertEquals(numTasks, futures.size(),
+ "Actual created task count did not match expected count");
+
+ int successCount = 0;
+ for (Future<Boolean> future : futures) {
+ if (future.get()) {
+ successCount++;
+ }
+ }
+
+ return successCount;
+ }
+
+ @FunctionalInterface
+ private interface ConcurrentOperation {
+ boolean execute(int index) throws Exception;
+ }
+}