Repository: accumulo Updated Branches: refs/heads/master b31ce443f -> 2f5203c24
ACCUMULO-4575 Fixed concurrent delete table bug Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/df400c59 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/df400c59 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/df400c59 Branch: refs/heads/master Commit: df400c59efd2274d8714cad4c9d3648bb0845c50 Parents: 4ea66d4 Author: Keith Turner <ktur...@apache.org> Authored: Wed Jan 25 12:19:01 2017 -0500 Committer: Keith Turner <ktur...@apache.org> Committed: Wed Jan 25 12:19:01 2017 -0500 ---------------------------------------------------------------------- .../org/apache/accumulo/fate/AdminUtil.java | 104 +++++++++++-- .../accumulo/cluster/AccumuloCluster.java | 6 + .../standalone/StandaloneAccumuloCluster.java | 12 ++ .../impl/MiniAccumuloClusterImpl.java | 8 + .../accumulo/master/FateServiceHandler.java | 2 +- .../accumulo/master/tableOps/DeleteTable.java | 25 +++- .../apache/accumulo/master/tableOps/Utils.java | 13 ++ .../test/functional/BackupMasterIT.java | 7 +- .../functional/ConcurrentDeleteTableIT.java | 147 +++++++++++++++++++ 9 files changed, 303 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/df400c59/fate/src/main/java/org/apache/accumulo/fate/AdminUtil.java ---------------------------------------------------------------------- diff --git a/fate/src/main/java/org/apache/accumulo/fate/AdminUtil.java b/fate/src/main/java/org/apache/accumulo/fate/AdminUtil.java index f6aa811..b8baa67 100644 --- a/fate/src/main/java/org/apache/accumulo/fate/AdminUtil.java +++ b/fate/src/main/java/org/apache/accumulo/fate/AdminUtil.java @@ -29,6 +29,7 @@ import java.util.Map.Entry; import java.util.Set; import org.apache.accumulo.fate.ReadOnlyTStore.TStatus; +import org.apache.accumulo.fate.zookeeper.IZooReader; import org.apache.accumulo.fate.zookeeper.IZooReaderWriter; import org.apache.accumulo.fate.zookeeper.ZooLock; import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy; @@ -62,11 +63,77 @@ public class AdminUtil<T> { this.exitOnError = exitOnError; } - public void print(ReadOnlyTStore<T> zs, IZooReaderWriter zk, String lockPath) throws KeeperException, InterruptedException { - print(zs, zk, lockPath, new Formatter(System.out), null, null); + public static class TransactionStatus { + + private final long txid; + private final TStatus status; + private final String debug; + private final List<String> hlocks; + private final List<String> wlocks; + private final String top; + + private TransactionStatus(Long tid, TStatus status, String debug, List<String> hlocks, List<String> wlocks, String top) { + this.txid = tid; + this.status = status; + this.debug = debug; + this.hlocks = Collections.unmodifiableList(hlocks); + this.wlocks = Collections.unmodifiableList(wlocks); + this.top = top; + } + + public long getTxid() { + return txid; + } + + public TStatus getStatus() { + return status; + } + + public String getDebug() { + return debug; + } + + public List<String> getHeldLocks() { + return hlocks; + } + + public List<String> getWaitingLocks() { + return wlocks; + } + + public String getTop() { + return top; + } + } - public void print(ReadOnlyTStore<T> zs, IZooReaderWriter zk, String lockPath, Formatter fmt, Set<Long> filterTxid, EnumSet<TStatus> filterStatus) + public static class FateStatus { + + private final List<TransactionStatus> transactions; + private final Map<Long,List<String>> danglingHeldLocks; + private final Map<Long,List<String>> danglingWaitingLocks; + + private FateStatus(List<TransactionStatus> transactions, Map<Long,List<String>> danglingHeldLocks, Map<Long,List<String>> danglingWaitingLocks) { + this.transactions = Collections.unmodifiableList(transactions); + this.danglingHeldLocks = Collections.unmodifiableMap(danglingHeldLocks); + this.danglingWaitingLocks = Collections.unmodifiableMap(danglingWaitingLocks); + + } + + public List<TransactionStatus> getTransactions() { + return transactions; + } + + public Map<Long,List<String>> getDanglingHeldLocks() { + return danglingHeldLocks; + } + + public Map<Long,List<String>> getDanglingWaitingLocks() { + return danglingWaitingLocks; + } + } + + public FateStatus getStatus(ReadOnlyTStore<T> zs, IZooReader zk, String lockPath, Set<Long> filterTxid, EnumSet<TStatus> filterStatus) throws KeeperException, InterruptedException { Map<Long,List<String>> heldLocks = new HashMap<>(); Map<Long,List<String>> waitingLocks = new HashMap<>(); @@ -118,13 +185,12 @@ public class AdminUtil<T> { } catch (Exception e) { log.error("Failed to read locks for " + id + " continuing.", e); - fmt.format("Failed to read locks for %s continuing", id); } } List<Long> transactions = zs.list(); + List<TransactionStatus> statuses = new ArrayList<>(transactions.size()); - long txCount = 0; for (Long tid : transactions) { zs.reserve(tid); @@ -152,17 +218,33 @@ public class AdminUtil<T> { if ((filterTxid != null && !filterTxid.contains(tid)) || (filterStatus != null && !filterStatus.contains(status))) continue; - ++txCount; - fmt.format("txid: %016x status: %-18s op: %-15s locked: %-15s locking: %-15s top: %s%n", tid, status, debug, hlocks, wlocks, top); + statuses.add(new TransactionStatus(tid, status, debug, hlocks, wlocks, top)); + } + + return new FateStatus(statuses, heldLocks, waitingLocks); + } + + public void print(ReadOnlyTStore<T> zs, IZooReader zk, String lockPath) throws KeeperException, InterruptedException { + print(zs, zk, lockPath, new Formatter(System.out), null, null); + } + + public void print(ReadOnlyTStore<T> zs, IZooReader zk, String lockPath, Formatter fmt, Set<Long> filterTxid, EnumSet<TStatus> filterStatus) + throws KeeperException, InterruptedException { + + FateStatus fateStatus = getStatus(zs, zk, lockPath, filterTxid, filterStatus); + + for (TransactionStatus txStatus : fateStatus.getTransactions()) { + fmt.format("txid: %016x status: %-18s op: %-15s locked: %-15s locking: %-15s top: %s%n", txStatus.getTxid(), txStatus.getStatus(), + txStatus.getDebug(), txStatus.getHeldLocks(), txStatus.getWaitingLocks(), txStatus.getTop()); } - fmt.format(" %s transactions", txCount); + fmt.format(" %s transactions", fateStatus.getTransactions().size()); - if (heldLocks.size() != 0 || waitingLocks.size() != 0) { + if (fateStatus.getDanglingHeldLocks().size() != 0 || fateStatus.getDanglingWaitingLocks().size() != 0) { fmt.format("%nThe following locks did not have an associated FATE operation%n"); - for (Entry<Long,List<String>> entry : heldLocks.entrySet()) + for (Entry<Long,List<String>> entry : fateStatus.getDanglingHeldLocks().entrySet()) fmt.format("txid: %016x locked: %s%n", entry.getKey(), entry.getValue()); - for (Entry<Long,List<String>> entry : waitingLocks.entrySet()) + for (Entry<Long,List<String>> entry : fateStatus.getDanglingWaitingLocks().entrySet()) fmt.format("txid: %016x locking: %s%n", entry.getKey(), entry.getValue()); } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/df400c59/minicluster/src/main/java/org/apache/accumulo/cluster/AccumuloCluster.java ---------------------------------------------------------------------- diff --git a/minicluster/src/main/java/org/apache/accumulo/cluster/AccumuloCluster.java b/minicluster/src/main/java/org/apache/accumulo/cluster/AccumuloCluster.java index 767633b..8e80358 100644 --- a/minicluster/src/main/java/org/apache/accumulo/cluster/AccumuloCluster.java +++ b/minicluster/src/main/java/org/apache/accumulo/cluster/AccumuloCluster.java @@ -23,6 +23,7 @@ import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.ClientConfiguration; import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; +import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -57,6 +58,11 @@ public interface AccumuloCluster { ClientConfiguration getClientConfig(); /** + * Get server side config derived from accumulo-site.xml + */ + AccumuloConfiguration getSiteConfiguration(); + + /** * Get an object that can manage a cluster * * @return Manage the state of the cluster http://git-wip-us.apache.org/repos/asf/accumulo/blob/df400c59/minicluster/src/main/java/org/apache/accumulo/cluster/standalone/StandaloneAccumuloCluster.java ---------------------------------------------------------------------- diff --git a/minicluster/src/main/java/org/apache/accumulo/cluster/standalone/StandaloneAccumuloCluster.java b/minicluster/src/main/java/org/apache/accumulo/cluster/standalone/StandaloneAccumuloCluster.java index 1baa3a1..ad84f2f 100644 --- a/minicluster/src/main/java/org/apache/accumulo/cluster/standalone/StandaloneAccumuloCluster.java +++ b/minicluster/src/main/java/org/apache/accumulo/cluster/standalone/StandaloneAccumuloCluster.java @@ -32,6 +32,8 @@ import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.ZooKeeperInstance; import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.ConfigurationCopy; import org.apache.accumulo.core.master.thrift.MasterGoalState; import org.apache.accumulo.core.util.CachedConfiguration; import org.apache.accumulo.minicluster.ServerType; @@ -41,6 +43,8 @@ import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.Iterables; + /** * AccumuloCluster implementation to connect to an existing deployment of Accumulo */ @@ -184,4 +188,12 @@ public class StandaloneAccumuloCluster implements AccumuloCluster { checkArgument(offset >= 0 && offset < users.size(), "Invalid offset, should be non-negative and less than " + users.size()); return users.get(offset); } + + @Override + public AccumuloConfiguration getSiteConfiguration() { + Configuration conf = new Configuration(false); + Path accumuloSite = new Path(serverAccumuloConfDir, "accumulo-site.xml"); + conf.addResource(accumuloSite); + return new ConfigurationCopy(Iterables.concat(AccumuloConfiguration.getDefaultConfiguration(), conf)); + } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/df400c59/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloClusterImpl.java ---------------------------------------------------------------------- diff --git a/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloClusterImpl.java b/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloClusterImpl.java index f2e5c7c..79ad527 100644 --- a/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloClusterImpl.java +++ b/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloClusterImpl.java @@ -66,6 +66,7 @@ import org.apache.accumulo.core.client.impl.MasterClient; import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException; import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.ConfigurationCopy; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.master.thrift.MasterClientService; @@ -110,6 +111,7 @@ import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; import com.google.common.collect.Maps; /** @@ -820,4 +822,10 @@ public class MiniAccumuloClusterImpl implements AccumuloCluster { return new Path(tmp.toString()); } } + + @Override + public AccumuloConfiguration getSiteConfiguration() { + // TODO Auto-generated method stub + return new ConfigurationCopy(Iterables.concat(AccumuloConfiguration.getDefaultConfiguration(), config.getSiteConfig().entrySet())); + } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/df400c59/server/master/src/main/java/org/apache/accumulo/master/FateServiceHandler.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/FateServiceHandler.java b/server/master/src/main/java/org/apache/accumulo/master/FateServiceHandler.java index 09a90b5..5f0ddd2 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/FateServiceHandler.java +++ b/server/master/src/main/java/org/apache/accumulo/master/FateServiceHandler.java @@ -254,7 +254,7 @@ class FateServiceHandler implements FateService.Iface { if (!canDeleteTable) throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED); - master.fate.seedTransaction(opid, new TraceRepo<>(new DeleteTable(tableId)), autoCleanup); + master.fate.seedTransaction(opid, new TraceRepo<>(new DeleteTable(namespaceId, tableId)), autoCleanup); break; } case TABLE_ONLINE: { http://git-wip-us.apache.org/repos/asf/accumulo/blob/df400c59/server/master/src/main/java/org/apache/accumulo/master/tableOps/DeleteTable.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/DeleteTable.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/DeleteTable.java index a1158f4..1eae5b9 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/DeleteTable.java +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/DeleteTable.java @@ -16,7 +16,6 @@ */ package org.apache.accumulo.master.tableOps; -import org.apache.accumulo.core.client.impl.Tables; import org.apache.accumulo.core.client.impl.thrift.TableOperation; import org.apache.accumulo.core.master.state.tables.TableState; import org.apache.accumulo.fate.Repo; @@ -28,20 +27,32 @@ public class DeleteTable extends MasterRepo { private static final long serialVersionUID = 1L; private String tableId; + private String namespaceId; - public DeleteTable(String tableId) { + private String getNamespaceId(Master environment) throws Exception { + if (namespaceId == null) { + // For ACCUMULO-4575 namespaceId was added in a bug fix release. Since it was added in bug fix release, we have to ensure we can properly deserialize + // older versions. When deserializing an older version, namespaceId will be null. For this case revert to the old buggy behavior. + return Utils.getNamespaceId(environment.getInstance(), tableId, TableOperation.DELETE); + } + + return namespaceId; + } + + public DeleteTable(String namespaceId, String tableId) { + this.namespaceId = namespaceId; this.tableId = tableId; } @Override public long isReady(long tid, Master environment) throws Exception { - String namespaceId = Tables.getNamespaceId(environment.getInstance(), tableId); + String namespaceId = getNamespaceId(environment); return Utils.reserveNamespace(namespaceId, tid, false, false, TableOperation.DELETE) + Utils.reserveTable(tableId, tid, true, true, TableOperation.DELETE); } @Override public Repo<Master> call(long tid, Master environment) throws Exception { - String namespaceId = Tables.getNamespaceId(environment.getInstance(), tableId); + String namespaceId = getNamespaceId(environment); TableManager.getInstance().transitionTableState(tableId, TableState.DELETING); environment.getEventCoordinator().event("deleting table %s ", tableId); return new CleanUp(tableId, namespaceId); @@ -49,9 +60,9 @@ public class DeleteTable extends MasterRepo { @Override public void undo(long tid, Master environment) throws Exception { - String namespaceId = Tables.getNamespaceId(environment.getInstance(), tableId); - Utils.unreserveNamespace(namespaceId, tid, false); + if (namespaceId != null) { + Utils.unreserveNamespace(namespaceId, tid, false); + } Utils.unreserveTable(tableId, tid, true); } - } http://git-wip-us.apache.org/repos/asf/accumulo/blob/df400c59/server/master/src/main/java/org/apache/accumulo/master/tableOps/Utils.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/Utils.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/Utils.java index 0fb9138..9b921e2 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/Utils.java +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/Utils.java @@ -116,6 +116,19 @@ public class Utils { return 100; } + public static String getNamespaceId(Instance instance, String tableId, TableOperation op) throws Exception { + try { + return Tables.getNamespaceId(instance, tableId); + } catch (RuntimeException e) { + // see if this was caused because the table does not exists + IZooReaderWriter zk = ZooReaderWriter.getInstance(); + if (!zk.exists(ZooUtil.getRoot(instance) + Constants.ZTABLES + "/" + tableId)) + throw new ThriftTableOperationException(tableId, "", op, TableOperationExceptionType.NOTFOUND, "Table does not exist"); + else + throw e; + } + } + public static long reserveHdfsDirectory(String directory, long tid) throws KeeperException, InterruptedException { Instance instance = HdfsZooInstance.getInstance(); http://git-wip-us.apache.org/repos/asf/accumulo/blob/df400c59/test/src/test/java/org/apache/accumulo/test/functional/BackupMasterIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/functional/BackupMasterIT.java b/test/src/test/java/org/apache/accumulo/test/functional/BackupMasterIT.java index efed7a4..8621ab1 100644 --- a/test/src/test/java/org/apache/accumulo/test/functional/BackupMasterIT.java +++ b/test/src/test/java/org/apache/accumulo/test/functional/BackupMasterIT.java @@ -19,10 +19,12 @@ package org.apache.accumulo.test.functional; import java.util.Collections; import java.util.List; +import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.fate.util.UtilWaitThread; -import org.apache.accumulo.fate.zookeeper.ZooReaderWriter; +import org.apache.accumulo.fate.zookeeper.IZooReaderWriter; import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy; import org.apache.accumulo.master.Master; +import org.apache.accumulo.server.zookeeper.ZooReaderWriterFactory; import org.junit.Test; public class BackupMasterIT extends ConfigurableMacIT { @@ -39,7 +41,8 @@ public class BackupMasterIT extends ConfigurableMacIT { // create a backup Process backup = exec(Master.class); try { - ZooReaderWriter writer = new ZooReaderWriter(cluster.getZooKeepers(), 30 * 1000, "digest", "accumulo:DONTTELL".getBytes()); + String secret = getCluster().getSiteConfiguration().get(Property.INSTANCE_SECRET); + IZooReaderWriter writer = new ZooReaderWriterFactory().getZooReaderWriter(cluster.getZooKeepers(), 30 * 1000, secret); String root = "/accumulo/" + getConnector().getInstance().getInstanceID(); List<String> children = Collections.emptyList(); // wait for 2 lock entries http://git-wip-us.apache.org/repos/asf/accumulo/blob/df400c59/test/src/test/java/org/apache/accumulo/test/functional/ConcurrentDeleteTableIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/functional/ConcurrentDeleteTableIT.java b/test/src/test/java/org/apache/accumulo/test/functional/ConcurrentDeleteTableIT.java new file mode 100644 index 0000000..4798095 --- /dev/null +++ b/test/src/test/java/org/apache/accumulo/test/functional/ConcurrentDeleteTableIT.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.accumulo.test.functional; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.TreeSet; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.zookeeper.ZooUtil; +import org.apache.accumulo.fate.AdminUtil; +import org.apache.accumulo.fate.AdminUtil.FateStatus; +import org.apache.accumulo.fate.ZooStore; +import org.apache.accumulo.fate.zookeeper.IZooReaderWriter; +import org.apache.accumulo.harness.AccumuloClusterIT; +import org.apache.accumulo.server.zookeeper.ZooReaderWriterFactory; +import org.apache.hadoop.io.Text; +import org.apache.zookeeper.KeeperException; +import org.junit.Assert; +import org.junit.Test; + +public class ConcurrentDeleteTableIT extends AccumuloClusterIT { + + @Test + public void testConcurrentDeleteTablesOps() throws Exception { + final Connector c = getConnector(); + String[] tables = getUniqueNames(2); + + TreeSet<Text> splits = new TreeSet<>(); + + for (int i = 0; i < 1000; i++) { + Text split = new Text(String.format("%09x", i * 100000)); + splits.add(split); + } + + ExecutorService es = Executors.newFixedThreadPool(20); + + int count = 0; + for (final String table : tables) { + c.tableOperations().create(table); + c.tableOperations().addSplits(table, splits); + writeData(c, table); + if (count == 1) { + c.tableOperations().flush(table, null, null, true); + } + count++; + + final CountDownLatch cdl = new CountDownLatch(20); + + List<Future<?>> futures = new ArrayList<>(); + + for (int i = 0; i < 20; i++) { + Future<?> future = es.submit(new Runnable() { + + @Override + public void run() { + try { + cdl.countDown(); + cdl.await(); + c.tableOperations().delete(table); + } catch (TableNotFoundException e) { + // expected + } catch (InterruptedException | AccumuloException | AccumuloSecurityException e) { + throw new RuntimeException(e); + } + } + }); + + futures.add(future); + } + + for (Future<?> future : futures) { + future.get(); + } + + try { + c.createScanner(table, Authorizations.EMPTY); + Assert.fail("Expected table " + table + " to be gone."); + } catch (TableNotFoundException tnfe) { + // expected + } + + FateStatus fateStatus = getFateStatus(); + + // ensure there are no dangling locks... before ACCUMULO-4575 was fixed concurrent delete tables could fail and leave dangling locks. + Assert.assertEquals(0, fateStatus.getDanglingHeldLocks().size()); + Assert.assertEquals(0, fateStatus.getDanglingWaitingLocks().size()); + } + + es.shutdown(); + } + + private FateStatus getFateStatus() throws KeeperException, InterruptedException { + Instance instance = getConnector().getInstance(); + AdminUtil<String> admin = new AdminUtil<>(false); + String secret = getCluster().getSiteConfiguration().get(Property.INSTANCE_SECRET); + IZooReaderWriter zk = new ZooReaderWriterFactory().getZooReaderWriter(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut(), secret); + ZooStore<String> zs = new ZooStore<String>(ZooUtil.getRoot(instance) + Constants.ZFATE, zk); + FateStatus fateStatus = admin.getStatus(zs, zk, ZooUtil.getRoot(instance) + Constants.ZTABLE_LOCKS, null, null); + return fateStatus; + } + + private void writeData(Connector c, String table) throws TableNotFoundException, MutationsRejectedException { + BatchWriter bw = c.createBatchWriter(table, new BatchWriterConfig()); + try { + Random rand = new Random(); + for (int i = 0; i < 1000; i++) { + Mutation m = new Mutation(String.format("%09x", rand.nextInt(100000 * 1000))); + m.put("m", "order", "" + i); + bw.addMutation(m); + } + } finally { + bw.close(); + } + } +}