Repository: accumulo Updated Branches: refs/heads/master adb26259e -> 96827a57e
ACCUMULO-4578 release namespace lock when compaction canceled Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/db84650e Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/db84650e Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/db84650e Branch: refs/heads/master Commit: db84650e7454b8354a28d0dcda8da1235a6ea175 Parents: 7b9a11a Author: Keith Turner <ktur...@apache.org> Authored: Thu Jan 26 21:42:03 2017 -0500 Committer: Keith Turner <ktur...@apache.org> Committed: Thu Jan 26 21:42:03 2017 -0500 ---------------------------------------------------------------------- .../org/apache/accumulo/fate/AdminUtil.java | 33 +++++++++++++------- .../master/tableOps/CancelCompactions.java | 2 +- .../master/tableOps/FinishCancelCompaction.java | 12 +++++-- .../apache/accumulo/test/TableOperationsIT.java | 7 +++++ .../accumulo/test/UserCompactionStrategyIT.java | 6 ++++ .../functional/ConcurrentDeleteTableIT.java | 32 ++----------------- .../test/functional/FateStarvationIT.java | 2 ++ .../test/functional/FunctionalTestUtils.java | 29 +++++++++++++++++ .../accumulo/test/functional/RenameIT.java | 2 ++ 9 files changed, 81 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/db84650e/fate/src/main/java/org/apache/accumulo/fate/AdminUtil.java ---------------------------------------------------------------------- diff --git a/fate/src/main/java/org/apache/accumulo/fate/AdminUtil.java b/fate/src/main/java/org/apache/accumulo/fate/AdminUtil.java index b8baa67..6d388ed 100644 --- a/fate/src/main/java/org/apache/accumulo/fate/AdminUtil.java +++ b/fate/src/main/java/org/apache/accumulo/fate/AdminUtil.java @@ -110,25 +110,36 @@ public class AdminUtil<T> { public static class FateStatus { private final List<TransactionStatus> transactions; - private final Map<Long,List<String>> danglingHeldLocks; - private final Map<Long,List<String>> danglingWaitingLocks; + private final Map<String,List<String>> danglingHeldLocks; + private final Map<String,List<String>> danglingWaitingLocks; + + private static Map<String,List<String>> convert(Map<Long,List<String>> danglocks) { + if (danglocks.isEmpty()) { + return Collections.emptyMap(); + } + + Map<String,List<String>> ret = new HashMap<>(); + for (Entry<Long,List<String>> entry : danglocks.entrySet()) { + ret.put(String.format("%016x", entry.getKey()), Collections.unmodifiableList(entry.getValue())); + } + return Collections.unmodifiableMap(ret); + } private FateStatus(List<TransactionStatus> transactions, Map<Long,List<String>> danglingHeldLocks, Map<Long,List<String>> danglingWaitingLocks) { this.transactions = Collections.unmodifiableList(transactions); - this.danglingHeldLocks = Collections.unmodifiableMap(danglingHeldLocks); - this.danglingWaitingLocks = Collections.unmodifiableMap(danglingWaitingLocks); - + this.danglingHeldLocks = convert(danglingHeldLocks); + this.danglingWaitingLocks = convert(danglingWaitingLocks); } public List<TransactionStatus> getTransactions() { return transactions; } - public Map<Long,List<String>> getDanglingHeldLocks() { + public Map<String,List<String>> getDanglingHeldLocks() { return danglingHeldLocks; } - public Map<Long,List<String>> getDanglingWaitingLocks() { + public Map<String,List<String>> getDanglingWaitingLocks() { return danglingWaitingLocks; } } @@ -241,11 +252,11 @@ public class AdminUtil<T> { if (fateStatus.getDanglingHeldLocks().size() != 0 || fateStatus.getDanglingWaitingLocks().size() != 0) { fmt.format("%nThe following locks did not have an associated FATE operation%n"); - for (Entry<Long,List<String>> entry : fateStatus.getDanglingHeldLocks().entrySet()) - fmt.format("txid: %016x locked: %s%n", entry.getKey(), entry.getValue()); + for (Entry<String,List<String>> entry : fateStatus.getDanglingHeldLocks().entrySet()) + fmt.format("txid: %s locked: %s%n", entry.getKey(), entry.getValue()); - for (Entry<Long,List<String>> entry : fateStatus.getDanglingWaitingLocks().entrySet()) - fmt.format("txid: %016x locking: %s%n", entry.getKey(), entry.getValue()); + for (Entry<String,List<String>> entry : fateStatus.getDanglingWaitingLocks().entrySet()) + fmt.format("txid: %s locking: %s%n", entry.getKey(), entry.getValue()); } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/db84650e/server/master/src/main/java/org/apache/accumulo/master/tableOps/CancelCompactions.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CancelCompactions.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CancelCompactions.java index c98174e..42d2699 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CancelCompactions.java +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CancelCompactions.java @@ -73,7 +73,7 @@ public class CancelCompactions extends MasterRepo { } }); - return new FinishCancelCompaction(tableId); + return new FinishCancelCompaction(getNamespaceId(environment), tableId); } @Override http://git-wip-us.apache.org/repos/asf/accumulo/blob/db84650e/server/master/src/main/java/org/apache/accumulo/master/tableOps/FinishCancelCompaction.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/FinishCancelCompaction.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/FinishCancelCompaction.java index 45fc8df..2bb34d2 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/FinishCancelCompaction.java +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/FinishCancelCompaction.java @@ -16,20 +16,28 @@ */ package org.apache.accumulo.master.tableOps; +import org.apache.accumulo.core.client.impl.thrift.TableOperation; import org.apache.accumulo.fate.Repo; import org.apache.accumulo.master.Master; class FinishCancelCompaction extends MasterRepo { private static final long serialVersionUID = 1L; private String tableId; + private String namespaceId; - public FinishCancelCompaction(String tableId) { + private String getNamespaceId(Master env) throws Exception { + return Utils.getNamespaceId(env.getInstance(), tableId, TableOperation.COMPACT_CANCEL, this.namespaceId); + } + + public FinishCancelCompaction(String namespaceId, String tableId) { this.tableId = tableId; + this.namespaceId = namespaceId; } @Override public Repo<Master> call(long tid, Master environment) throws Exception { - Utils.getReadLock(tableId, tid).unlock(); + Utils.unreserveTable(tableId, tid, false); + Utils.unreserveNamespace(getNamespaceId(environment), tid, false); return null; } http://git-wip-us.apache.org/repos/asf/accumulo/blob/db84650e/test/src/test/java/org/apache/accumulo/test/TableOperationsIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/TableOperationsIT.java b/test/src/test/java/org/apache/accumulo/test/TableOperationsIT.java index 54cb738..0d91bb0 100644 --- a/test/src/test/java/org/apache/accumulo/test/TableOperationsIT.java +++ b/test/src/test/java/org/apache/accumulo/test/TableOperationsIT.java @@ -58,8 +58,10 @@ import org.apache.accumulo.core.tabletserver.thrift.TabletClientService; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.harness.AccumuloClusterIT; import org.apache.accumulo.test.functional.BadIterator; +import org.apache.accumulo.test.functional.FunctionalTestUtils; import org.apache.hadoop.io.Text; import org.apache.thrift.TException; +import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -82,6 +84,11 @@ public class TableOperationsIT extends AccumuloClusterIT { connector = getConnector(); } + @After + public void checkForDanglingFateLocks() { + FunctionalTestUtils.assertNoDanglingFateLocks(getConnector().getInstance(), getCluster()); + } + @Test public void getDiskUsageErrors() throws TableExistsException, AccumuloException, AccumuloSecurityException, TableNotFoundException, TException { String tableName = getUniqueNames(1)[0]; http://git-wip-us.apache.org/repos/asf/accumulo/blob/db84650e/test/src/test/java/org/apache/accumulo/test/UserCompactionStrategyIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/UserCompactionStrategyIT.java b/test/src/test/java/org/apache/accumulo/test/UserCompactionStrategyIT.java index 844a4d2..2d1bd15 100644 --- a/test/src/test/java/org/apache/accumulo/test/UserCompactionStrategyIT.java +++ b/test/src/test/java/org/apache/accumulo/test/UserCompactionStrategyIT.java @@ -44,6 +44,7 @@ import org.apache.accumulo.harness.AccumuloClusterIT; import org.apache.accumulo.test.functional.FunctionalTestUtils; import org.apache.accumulo.test.functional.SlowIterator; import org.apache.hadoop.io.Text; +import org.junit.After; import org.junit.Assert; import org.junit.Assume; import org.junit.Test; @@ -58,6 +59,11 @@ public class UserCompactionStrategyIT extends AccumuloClusterIT { return 3 * 60; } + @After + public void checkForDanglingFateLocks() { + FunctionalTestUtils.assertNoDanglingFateLocks(getConnector().getInstance(), getCluster()); + } + @Test public void testDropA() throws Exception { Connector c = getConnector(); http://git-wip-us.apache.org/repos/asf/accumulo/blob/db84650e/test/src/test/java/org/apache/accumulo/test/functional/ConcurrentDeleteTableIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/functional/ConcurrentDeleteTableIT.java b/test/src/test/java/org/apache/accumulo/test/functional/ConcurrentDeleteTableIT.java index 0c63e59..0116f64 100644 --- a/test/src/test/java/org/apache/accumulo/test/functional/ConcurrentDeleteTableIT.java +++ b/test/src/test/java/org/apache/accumulo/test/functional/ConcurrentDeleteTableIT.java @@ -29,29 +29,19 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.BatchWriterConfig; import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.MutationsRejectedException; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.TableOfflineException; import org.apache.accumulo.core.client.admin.CompactionConfig; -import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.core.zookeeper.ZooUtil; -import org.apache.accumulo.fate.AdminUtil; -import org.apache.accumulo.fate.AdminUtil.FateStatus; -import org.apache.accumulo.fate.ZooStore; -import org.apache.accumulo.fate.zookeeper.IZooReaderWriter; import org.apache.accumulo.harness.AccumuloClusterIT; -import org.apache.accumulo.server.zookeeper.ZooReaderWriterFactory; import org.apache.hadoop.io.Text; -import org.apache.zookeeper.KeeperException; import org.junit.Assert; import org.junit.Test; @@ -112,11 +102,7 @@ public class ConcurrentDeleteTableIT extends AccumuloClusterIT { // expected } - FateStatus fateStatus = getFateStatus(); - - // ensure there are no dangling locks... before ACCUMULO-4575 was fixed concurrent delete tables could fail and leave dangling locks. - Assert.assertEquals(0, fateStatus.getDanglingHeldLocks().size()); - Assert.assertEquals(0, fateStatus.getDanglingWaitingLocks().size()); + FunctionalTestUtils.assertNoDanglingFateLocks(getConnector().getInstance(), getCluster()); } es.shutdown(); @@ -262,26 +248,12 @@ public class ConcurrentDeleteTableIT extends AccumuloClusterIT { // expected } - FateStatus fateStatus = getFateStatus(); - - // ensure there are no dangling locks... before ACCUMULO-4575 was fixed concurrent delete tables could fail and leave dangling locks. - Assert.assertEquals(0, fateStatus.getDanglingHeldLocks().size()); - Assert.assertEquals(0, fateStatus.getDanglingWaitingLocks().size()); + FunctionalTestUtils.assertNoDanglingFateLocks(getConnector().getInstance(), getCluster()); } es.shutdown(); } - private FateStatus getFateStatus() throws KeeperException, InterruptedException { - Instance instance = getConnector().getInstance(); - AdminUtil<String> admin = new AdminUtil<>(false); - String secret = getCluster().getSiteConfiguration().get(Property.INSTANCE_SECRET); - IZooReaderWriter zk = new ZooReaderWriterFactory().getZooReaderWriter(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut(), secret); - ZooStore<String> zs = new ZooStore<String>(ZooUtil.getRoot(instance) + Constants.ZFATE, zk); - FateStatus fateStatus = admin.getStatus(zs, zk, ZooUtil.getRoot(instance) + Constants.ZTABLE_LOCKS, null, null); - return fateStatus; - } - private void writeData(Connector c, String table) throws TableNotFoundException, MutationsRejectedException { BatchWriter bw = c.createBatchWriter(table, new BatchWriterConfig()); try { http://git-wip-us.apache.org/repos/asf/accumulo/blob/db84650e/test/src/test/java/org/apache/accumulo/test/functional/FateStarvationIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/functional/FateStarvationIT.java b/test/src/test/java/org/apache/accumulo/test/functional/FateStarvationIT.java index 7eb7b89..def1a2c 100644 --- a/test/src/test/java/org/apache/accumulo/test/functional/FateStarvationIT.java +++ b/test/src/test/java/org/apache/accumulo/test/functional/FateStarvationIT.java @@ -75,6 +75,8 @@ public class FateStarvationIT extends AccumuloClusterIT { } c.tableOperations().offline(tableName); + + FunctionalTestUtils.assertNoDanglingFateLocks(getConnector().getInstance(), getCluster()); } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/db84650e/test/src/test/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java b/test/src/test/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java index c548f2f..2fea4c6 100644 --- a/test/src/test/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java +++ b/test/src/test/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java @@ -32,9 +32,13 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.accumulo.cluster.AccumuloCluster; +import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.cli.BatchWriterOpts; import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Range; @@ -42,13 +46,20 @@ import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.schema.MetadataSchema; import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.zookeeper.ZooUtil; +import org.apache.accumulo.fate.AdminUtil; +import org.apache.accumulo.fate.AdminUtil.FateStatus; +import org.apache.accumulo.fate.ZooStore; +import org.apache.accumulo.fate.zookeeper.IZooReaderWriter; import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl; import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl.LogWriter; +import org.apache.accumulo.server.zookeeper.ZooReaderWriterFactory; import org.apache.accumulo.test.TestIngest; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FsShell; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; +import org.apache.zookeeper.KeeperException; import org.junit.Assert; import com.google.common.collect.Iterators; @@ -189,4 +200,22 @@ public class FunctionalTestUtils { return result; } + public static void assertNoDanglingFateLocks(Instance instance, AccumuloCluster cluster) { + FateStatus fateStatus = getFateStatus(instance, cluster); + Assert.assertEquals("Dangling FATE locks : " + fateStatus.getDanglingHeldLocks(), 0, fateStatus.getDanglingHeldLocks().size()); + Assert.assertEquals("Dangling FATE locks : " + fateStatus.getDanglingWaitingLocks(), 0, fateStatus.getDanglingWaitingLocks().size()); + } + + private static FateStatus getFateStatus(Instance instance, AccumuloCluster cluster) { + try { + AdminUtil<String> admin = new AdminUtil<>(false); + String secret = cluster.getSiteConfiguration().get(Property.INSTANCE_SECRET); + IZooReaderWriter zk = new ZooReaderWriterFactory().getZooReaderWriter(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut(), secret); + ZooStore<String> zs = new ZooStore<String>(ZooUtil.getRoot(instance) + Constants.ZFATE, zk); + FateStatus fateStatus = admin.getStatus(zs, zk, ZooUtil.getRoot(instance) + Constants.ZTABLE_LOCKS, null, null); + return fateStatus; + } catch (KeeperException | InterruptedException e) { + throw new RuntimeException(e); + } + } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/db84650e/test/src/test/java/org/apache/accumulo/test/functional/RenameIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/functional/RenameIT.java b/test/src/test/java/org/apache/accumulo/test/functional/RenameIT.java index 6befd7e..e2ad7ae 100644 --- a/test/src/test/java/org/apache/accumulo/test/functional/RenameIT.java +++ b/test/src/test/java/org/apache/accumulo/test/functional/RenameIT.java @@ -69,6 +69,8 @@ public class RenameIT extends AccumuloClusterIT { c.tableOperations().rename(name2, name1); vopts.setTableName(name1); VerifyIngest.verifyIngest(c, vopts, scanOpts); + + FunctionalTestUtils.assertNoDanglingFateLocks(getConnector().getInstance(), getCluster()); } }