Repository: accumulo
Updated Branches:
  refs/heads/1.7 7b9a11ad4 -> 6d8a5fa59


ACCUMULO-4578 release namespace lock when compaction canceled


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/db84650e
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/db84650e
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/db84650e

Branch: refs/heads/1.7
Commit: db84650e7454b8354a28d0dcda8da1235a6ea175
Parents: 7b9a11a
Author: Keith Turner <ktur...@apache.org>
Authored: Thu Jan 26 21:42:03 2017 -0500
Committer: Keith Turner <ktur...@apache.org>
Committed: Thu Jan 26 21:42:03 2017 -0500

----------------------------------------------------------------------
 .../org/apache/accumulo/fate/AdminUtil.java     | 33 +++++++++++++-------
 .../master/tableOps/CancelCompactions.java      |  2 +-
 .../master/tableOps/FinishCancelCompaction.java | 12 +++++--
 .../apache/accumulo/test/TableOperationsIT.java |  7 +++++
 .../accumulo/test/UserCompactionStrategyIT.java |  6 ++++
 .../functional/ConcurrentDeleteTableIT.java     | 32 ++-----------------
 .../test/functional/FateStarvationIT.java       |  2 ++
 .../test/functional/FunctionalTestUtils.java    | 29 +++++++++++++++++
 .../accumulo/test/functional/RenameIT.java      |  2 ++
 9 files changed, 81 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/db84650e/fate/src/main/java/org/apache/accumulo/fate/AdminUtil.java
----------------------------------------------------------------------
diff --git a/fate/src/main/java/org/apache/accumulo/fate/AdminUtil.java 
b/fate/src/main/java/org/apache/accumulo/fate/AdminUtil.java
index b8baa67..6d388ed 100644
--- a/fate/src/main/java/org/apache/accumulo/fate/AdminUtil.java
+++ b/fate/src/main/java/org/apache/accumulo/fate/AdminUtil.java
@@ -110,25 +110,36 @@ public class AdminUtil<T> {
   public static class FateStatus {
 
     private final List<TransactionStatus> transactions;
-    private final Map<Long,List<String>> danglingHeldLocks;
-    private final Map<Long,List<String>> danglingWaitingLocks;
+    private final Map<String,List<String>> danglingHeldLocks;
+    private final Map<String,List<String>> danglingWaitingLocks;
+
+    private static Map<String,List<String>> convert(Map<Long,List<String>> 
danglocks) {
+      if (danglocks.isEmpty()) {
+        return Collections.emptyMap();
+      }
+
+      Map<String,List<String>> ret = new HashMap<>();
+      for (Entry<Long,List<String>> entry : danglocks.entrySet()) {
+        ret.put(String.format("%016x", entry.getKey()), 
Collections.unmodifiableList(entry.getValue()));
+      }
+      return Collections.unmodifiableMap(ret);
+    }
 
     private FateStatus(List<TransactionStatus> transactions, 
Map<Long,List<String>> danglingHeldLocks, Map<Long,List<String>> 
danglingWaitingLocks) {
       this.transactions = Collections.unmodifiableList(transactions);
-      this.danglingHeldLocks = Collections.unmodifiableMap(danglingHeldLocks);
-      this.danglingWaitingLocks = 
Collections.unmodifiableMap(danglingWaitingLocks);
-
+      this.danglingHeldLocks = convert(danglingHeldLocks);
+      this.danglingWaitingLocks = convert(danglingWaitingLocks);
     }
 
     public List<TransactionStatus> getTransactions() {
       return transactions;
     }
 
-    public Map<Long,List<String>> getDanglingHeldLocks() {
+    public Map<String,List<String>> getDanglingHeldLocks() {
       return danglingHeldLocks;
     }
 
-    public Map<Long,List<String>> getDanglingWaitingLocks() {
+    public Map<String,List<String>> getDanglingWaitingLocks() {
       return danglingWaitingLocks;
     }
   }
@@ -241,11 +252,11 @@ public class AdminUtil<T> {
 
     if (fateStatus.getDanglingHeldLocks().size() != 0 || 
fateStatus.getDanglingWaitingLocks().size() != 0) {
       fmt.format("%nThe following locks did not have an associated FATE 
operation%n");
-      for (Entry<Long,List<String>> entry : 
fateStatus.getDanglingHeldLocks().entrySet())
-        fmt.format("txid: %016x  locked: %s%n", entry.getKey(), 
entry.getValue());
+      for (Entry<String,List<String>> entry : 
fateStatus.getDanglingHeldLocks().entrySet())
+        fmt.format("txid: %s  locked: %s%n", entry.getKey(), entry.getValue());
 
-      for (Entry<Long,List<String>> entry : 
fateStatus.getDanglingWaitingLocks().entrySet())
-        fmt.format("txid: %016x  locking: %s%n", entry.getKey(), 
entry.getValue());
+      for (Entry<String,List<String>> entry : 
fateStatus.getDanglingWaitingLocks().entrySet())
+        fmt.format("txid: %s  locking: %s%n", entry.getKey(), 
entry.getValue());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/db84650e/server/master/src/main/java/org/apache/accumulo/master/tableOps/CancelCompactions.java
----------------------------------------------------------------------
diff --git 
a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CancelCompactions.java
 
b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CancelCompactions.java
index c98174e..42d2699 100644
--- 
a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CancelCompactions.java
+++ 
b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CancelCompactions.java
@@ -73,7 +73,7 @@ public class CancelCompactions extends MasterRepo {
       }
     });
 
-    return new FinishCancelCompaction(tableId);
+    return new FinishCancelCompaction(getNamespaceId(environment), tableId);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/accumulo/blob/db84650e/server/master/src/main/java/org/apache/accumulo/master/tableOps/FinishCancelCompaction.java
----------------------------------------------------------------------
diff --git 
a/server/master/src/main/java/org/apache/accumulo/master/tableOps/FinishCancelCompaction.java
 
b/server/master/src/main/java/org/apache/accumulo/master/tableOps/FinishCancelCompaction.java
index 45fc8df..2bb34d2 100644
--- 
a/server/master/src/main/java/org/apache/accumulo/master/tableOps/FinishCancelCompaction.java
+++ 
b/server/master/src/main/java/org/apache/accumulo/master/tableOps/FinishCancelCompaction.java
@@ -16,20 +16,28 @@
  */
 package org.apache.accumulo.master.tableOps;
 
+import org.apache.accumulo.core.client.impl.thrift.TableOperation;
 import org.apache.accumulo.fate.Repo;
 import org.apache.accumulo.master.Master;
 
 class FinishCancelCompaction extends MasterRepo {
   private static final long serialVersionUID = 1L;
   private String tableId;
+  private String namespaceId;
 
-  public FinishCancelCompaction(String tableId) {
+  private String getNamespaceId(Master env) throws Exception {
+    return Utils.getNamespaceId(env.getInstance(), tableId, 
TableOperation.COMPACT_CANCEL, this.namespaceId);
+  }
+
+  public FinishCancelCompaction(String namespaceId, String tableId) {
     this.tableId = tableId;
+    this.namespaceId = namespaceId;
   }
 
   @Override
   public Repo<Master> call(long tid, Master environment) throws Exception {
-    Utils.getReadLock(tableId, tid).unlock();
+    Utils.unreserveTable(tableId, tid, false);
+    Utils.unreserveNamespace(getNamespaceId(environment), tid, false);
     return null;
   }
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/db84650e/test/src/test/java/org/apache/accumulo/test/TableOperationsIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/TableOperationsIT.java 
b/test/src/test/java/org/apache/accumulo/test/TableOperationsIT.java
index 54cb738..0d91bb0 100644
--- a/test/src/test/java/org/apache/accumulo/test/TableOperationsIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/TableOperationsIT.java
@@ -58,8 +58,10 @@ import 
org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.harness.AccumuloClusterIT;
 import org.apache.accumulo.test.functional.BadIterator;
+import org.apache.accumulo.test.functional.FunctionalTestUtils;
 import org.apache.hadoop.io.Text;
 import org.apache.thrift.TException;
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -82,6 +84,11 @@ public class TableOperationsIT extends AccumuloClusterIT {
     connector = getConnector();
   }
 
+  @After
+  public void checkForDanglingFateLocks() {
+    
FunctionalTestUtils.assertNoDanglingFateLocks(getConnector().getInstance(), 
getCluster());
+  }
+
   @Test
   public void getDiskUsageErrors() throws TableExistsException, 
AccumuloException, AccumuloSecurityException, TableNotFoundException, 
TException {
     String tableName = getUniqueNames(1)[0];

http://git-wip-us.apache.org/repos/asf/accumulo/blob/db84650e/test/src/test/java/org/apache/accumulo/test/UserCompactionStrategyIT.java
----------------------------------------------------------------------
diff --git 
a/test/src/test/java/org/apache/accumulo/test/UserCompactionStrategyIT.java 
b/test/src/test/java/org/apache/accumulo/test/UserCompactionStrategyIT.java
index 844a4d2..2d1bd15 100644
--- a/test/src/test/java/org/apache/accumulo/test/UserCompactionStrategyIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/UserCompactionStrategyIT.java
@@ -44,6 +44,7 @@ import org.apache.accumulo.harness.AccumuloClusterIT;
 import org.apache.accumulo.test.functional.FunctionalTestUtils;
 import org.apache.accumulo.test.functional.SlowIterator;
 import org.apache.hadoop.io.Text;
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.Assume;
 import org.junit.Test;
@@ -58,6 +59,11 @@ public class UserCompactionStrategyIT extends 
AccumuloClusterIT {
     return 3 * 60;
   }
 
+  @After
+  public void checkForDanglingFateLocks() {
+    
FunctionalTestUtils.assertNoDanglingFateLocks(getConnector().getInstance(), 
getCluster());
+  }
+
   @Test
   public void testDropA() throws Exception {
     Connector c = getConnector();

http://git-wip-us.apache.org/repos/asf/accumulo/blob/db84650e/test/src/test/java/org/apache/accumulo/test/functional/ConcurrentDeleteTableIT.java
----------------------------------------------------------------------
diff --git 
a/test/src/test/java/org/apache/accumulo/test/functional/ConcurrentDeleteTableIT.java
 
b/test/src/test/java/org/apache/accumulo/test/functional/ConcurrentDeleteTableIT.java
index 0c63e59..0116f64 100644
--- 
a/test/src/test/java/org/apache/accumulo/test/functional/ConcurrentDeleteTableIT.java
+++ 
b/test/src/test/java/org/apache/accumulo/test/functional/ConcurrentDeleteTableIT.java
@@ -29,29 +29,19 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
-import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.MutationsRejectedException;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.TableOfflineException;
 import org.apache.accumulo.core.client.admin.CompactionConfig;
-import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.zookeeper.ZooUtil;
-import org.apache.accumulo.fate.AdminUtil;
-import org.apache.accumulo.fate.AdminUtil.FateStatus;
-import org.apache.accumulo.fate.ZooStore;
-import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
 import org.apache.accumulo.harness.AccumuloClusterIT;
-import org.apache.accumulo.server.zookeeper.ZooReaderWriterFactory;
 import org.apache.hadoop.io.Text;
-import org.apache.zookeeper.KeeperException;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -112,11 +102,7 @@ public class ConcurrentDeleteTableIT extends 
AccumuloClusterIT {
         // expected
       }
 
-      FateStatus fateStatus = getFateStatus();
-
-      // ensure there are no dangling locks... before ACCUMULO-4575 was fixed 
concurrent delete tables could fail and leave dangling locks.
-      Assert.assertEquals(0, fateStatus.getDanglingHeldLocks().size());
-      Assert.assertEquals(0, fateStatus.getDanglingWaitingLocks().size());
+      
FunctionalTestUtils.assertNoDanglingFateLocks(getConnector().getInstance(), 
getCluster());
     }
 
     es.shutdown();
@@ -262,26 +248,12 @@ public class ConcurrentDeleteTableIT extends 
AccumuloClusterIT {
         // expected
       }
 
-      FateStatus fateStatus = getFateStatus();
-
-      // ensure there are no dangling locks... before ACCUMULO-4575 was fixed 
concurrent delete tables could fail and leave dangling locks.
-      Assert.assertEquals(0, fateStatus.getDanglingHeldLocks().size());
-      Assert.assertEquals(0, fateStatus.getDanglingWaitingLocks().size());
+      
FunctionalTestUtils.assertNoDanglingFateLocks(getConnector().getInstance(), 
getCluster());
     }
 
     es.shutdown();
   }
 
-  private FateStatus getFateStatus() throws KeeperException, 
InterruptedException {
-    Instance instance = getConnector().getInstance();
-    AdminUtil<String> admin = new AdminUtil<>(false);
-    String secret = 
getCluster().getSiteConfiguration().get(Property.INSTANCE_SECRET);
-    IZooReaderWriter zk = new 
ZooReaderWriterFactory().getZooReaderWriter(instance.getZooKeepers(), 
instance.getZooKeepersSessionTimeOut(), secret);
-    ZooStore<String> zs = new ZooStore<String>(ZooUtil.getRoot(instance) + 
Constants.ZFATE, zk);
-    FateStatus fateStatus = admin.getStatus(zs, zk, ZooUtil.getRoot(instance) 
+ Constants.ZTABLE_LOCKS, null, null);
-    return fateStatus;
-  }
-
   private void writeData(Connector c, String table) throws 
TableNotFoundException, MutationsRejectedException {
     BatchWriter bw = c.createBatchWriter(table, new BatchWriterConfig());
     try {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/db84650e/test/src/test/java/org/apache/accumulo/test/functional/FateStarvationIT.java
----------------------------------------------------------------------
diff --git 
a/test/src/test/java/org/apache/accumulo/test/functional/FateStarvationIT.java 
b/test/src/test/java/org/apache/accumulo/test/functional/FateStarvationIT.java
index 7eb7b89..def1a2c 100644
--- 
a/test/src/test/java/org/apache/accumulo/test/functional/FateStarvationIT.java
+++ 
b/test/src/test/java/org/apache/accumulo/test/functional/FateStarvationIT.java
@@ -75,6 +75,8 @@ public class FateStarvationIT extends AccumuloClusterIT {
     }
 
     c.tableOperations().offline(tableName);
+
+    
FunctionalTestUtils.assertNoDanglingFateLocks(getConnector().getInstance(), 
getCluster());
   }
 
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/db84650e/test/src/test/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java
----------------------------------------------------------------------
diff --git 
a/test/src/test/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java
 
b/test/src/test/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java
index c548f2f..2fea4c6 100644
--- 
a/test/src/test/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java
+++ 
b/test/src/test/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java
@@ -32,9 +32,13 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.accumulo.cluster.AccumuloCluster;
+import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.cli.BatchWriterOpts;
 import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Range;
@@ -42,13 +46,20 @@ import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema;
 import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.AdminUtil;
+import org.apache.accumulo.fate.AdminUtil.FateStatus;
+import org.apache.accumulo.fate.ZooStore;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
 import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
 import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl.LogWriter;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriterFactory;
 import org.apache.accumulo.test.TestIngest;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FsShell;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
+import org.apache.zookeeper.KeeperException;
 import org.junit.Assert;
 
 import com.google.common.collect.Iterators;
@@ -189,4 +200,22 @@ public class FunctionalTestUtils {
     return result;
   }
 
+  public static void assertNoDanglingFateLocks(Instance instance, 
AccumuloCluster cluster) {
+    FateStatus fateStatus = getFateStatus(instance, cluster);
+    Assert.assertEquals("Dangling FATE locks : " + 
fateStatus.getDanglingHeldLocks(), 0, fateStatus.getDanglingHeldLocks().size());
+    Assert.assertEquals("Dangling FATE locks : " + 
fateStatus.getDanglingWaitingLocks(), 0, 
fateStatus.getDanglingWaitingLocks().size());
+  }
+
+  private static FateStatus getFateStatus(Instance instance, AccumuloCluster 
cluster) {
+    try {
+      AdminUtil<String> admin = new AdminUtil<>(false);
+      String secret = 
cluster.getSiteConfiguration().get(Property.INSTANCE_SECRET);
+      IZooReaderWriter zk = new 
ZooReaderWriterFactory().getZooReaderWriter(instance.getZooKeepers(), 
instance.getZooKeepersSessionTimeOut(), secret);
+      ZooStore<String> zs = new ZooStore<String>(ZooUtil.getRoot(instance) + 
Constants.ZFATE, zk);
+      FateStatus fateStatus = admin.getStatus(zs, zk, 
ZooUtil.getRoot(instance) + Constants.ZTABLE_LOCKS, null, null);
+      return fateStatus;
+    } catch (KeeperException | InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/db84650e/test/src/test/java/org/apache/accumulo/test/functional/RenameIT.java
----------------------------------------------------------------------
diff --git 
a/test/src/test/java/org/apache/accumulo/test/functional/RenameIT.java 
b/test/src/test/java/org/apache/accumulo/test/functional/RenameIT.java
index 6befd7e..e2ad7ae 100644
--- a/test/src/test/java/org/apache/accumulo/test/functional/RenameIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/RenameIT.java
@@ -69,6 +69,8 @@ public class RenameIT extends AccumuloClusterIT {
     c.tableOperations().rename(name2, name1);
     vopts.setTableName(name1);
     VerifyIngest.verifyIngest(c, vopts, scanOpts);
+
+    
FunctionalTestUtils.assertNoDanglingFateLocks(getConnector().getInstance(), 
getCluster());
   }
 
 }

Reply via email to