This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch 1451-external-compactions-feature
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to 
refs/heads/1451-external-compactions-feature by this push:
     new 8c52daf  Revert "Modify announceExistence to use ephemeral node and no 
lock" but retain minor edits
8c52daf is described below

commit 8c52dafa00974cd0705165dccdce58b8627dcdab
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Wed May 5 16:10:27 2021 +0000

    Revert "Modify announceExistence to use ephemeral node and no lock" but 
retain minor edits
    
    This reverts commit 4296d2ab70a75ae50420c2f2c7e5a850142fd6cd.
---
 .../util/compaction/ExternalCompactionUtil.java    | 10 +++-
 .../org/apache/accumulo/compactor/Compactor.java   | 57 +++++++++++++++++++++-
 .../apache/accumulo/test/ExternalCompactionIT.java |  2 +-
 3 files changed, 65 insertions(+), 4 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java
 
b/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java
index 974864b..56114ef 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java
@@ -100,8 +100,14 @@ public class ExternalCompactionUtil {
         try {
           List<String> compactors = zooReader.getChildren(compactorQueuesPath 
+ "/" + queue);
           for (String compactor : compactors) {
-            LOG.debug("Found live compactor: {}", compactor);
-            compactAddrs.add(HostAndPort.fromString(compactor));
+            // compactor is the address, we are checking to see if there is a 
child node which
+            // represents the compactor's lock as a check that it's alive.
+            List<String> children =
+                zooReader.getChildren(compactorQueuesPath + "/" + queue + "/" 
+ compactor);
+            if (!children.isEmpty()) {
+              LOG.debug("Found live compactor {} ", compactor);
+              compactAddrs.add(HostAndPort.fromString(compactor));
+            }
           }
         } catch (NoNodeException e) {
           LOG.trace("Ignoring node that went missing", e);
diff --git 
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index c19e1d4..b51655f 100644
--- 
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++ 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@ -19,6 +19,7 @@
 package org.apache.accumulo.compactor;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
+import static 
org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly;
 
 import java.io.IOException;
 import java.net.UnknownHostException;
@@ -63,12 +64,19 @@ import 
org.apache.accumulo.core.tabletserver.thrift.TCompactionStats;
 import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob;
 import org.apache.accumulo.core.trace.TraceUtil;
 import org.apache.accumulo.core.trace.thrift.TInfo;
+import org.apache.accumulo.core.util.Halt;
 import org.apache.accumulo.core.util.HostAndPort;
+import org.apache.accumulo.core.util.ServerServices;
+import org.apache.accumulo.core.util.ServerServices.Service;
 import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil;
 import org.apache.accumulo.core.util.threads.ThreadPools;
 import org.apache.accumulo.core.util.threads.Threads;
 import org.apache.accumulo.fate.util.UtilWaitThread;
+import org.apache.accumulo.fate.zookeeper.ServiceLock;
+import org.apache.accumulo.fate.zookeeper.ServiceLock.LockLossReason;
+import org.apache.accumulo.fate.zookeeper.ServiceLock.LockWatcher;
 import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
 import org.apache.accumulo.server.AbstractServer;
 import org.apache.accumulo.server.GarbageCollectionLogger;
 import org.apache.accumulo.server.ServerOpts;
@@ -117,6 +125,7 @@ public class Compactor extends AbstractServer
   protected static final CompactionJobHolder JOB_HOLDER = new 
CompactionJobHolder();
 
   private final GarbageCollectionLogger gcLogger = new 
GarbageCollectionLogger();
+  private final UUID compactorId = UUID.randomUUID();
   private final AccumuloConfiguration aconf;
   private final String queueName;
   private final AtomicReference<CompactionCoordinator.Client> 
coordinatorClient =
@@ -125,6 +134,7 @@ public class Compactor extends AbstractServer
       new AtomicReference<>();
 
   private SecurityOperation security;
+  private ServiceLock compactorLock;
   private ServerAddress compactorAddress = null;
 
   // Exposed for tests
@@ -241,7 +251,7 @@ public class Compactor extends AbstractServer
 
     try {
       zoo.mkdirs(compactorQueuePath);
-      zoo.putEphemeralData(zPath, new byte[] {});
+      zoo.putPersistentData(zPath, new byte[] {}, NodeExistsPolicy.SKIP);
     } catch (KeeperException e) {
       if (e.code() == KeeperException.Code.NOAUTH) {
         LOG.error("Failed to write to ZooKeeper. Ensure that"
@@ -249,6 +259,44 @@ public class Compactor extends AbstractServer
       }
       throw e;
     }
+
+    compactorLock = new 
ServiceLock(getContext().getZooReaderWriter().getZooKeeper(),
+        ServiceLock.path(zPath), compactorId);
+    LockWatcher lw = new LockWatcher() {
+      @Override
+      public void lostLock(final LockLossReason reason) {
+        Halt.halt(1, () -> {
+          LOG.error("Compactor lost lock (reason = {}), exiting.", reason);
+          gcLogger.logGCInfo(getConfiguration());
+        });
+      }
+
+      @Override
+      public void unableToMonitorLockNode(final Exception e) {
+        Halt.halt(1, () -> LOG.error("Lost ability to monitor Compactor lock, 
exiting.", e));
+      }
+    };
+
+    try {
+      byte[] lockContent =
+          new ServerServices(hostPort, 
Service.COMPACTOR_CLIENT).toString().getBytes(UTF_8);
+      for (int i = 0; i < 25; i++) {
+        zoo.putPersistentData(zPath, new byte[0], NodeExistsPolicy.SKIP);
+
+        if (compactorLock.tryLock(lw, lockContent)) {
+          LOG.debug("Obtained Compactor lock {}", compactorLock.getLockPath());
+          return;
+        }
+        LOG.info("Waiting for Compactor lock");
+        sleepUninterruptibly(5, TimeUnit.SECONDS);
+      }
+      String msg = "Too many retries, exiting.";
+      LOG.info(msg);
+      throw new RuntimeException(msg);
+    } catch (Exception e) {
+      LOG.info("Could not obtain tablet server lock, exiting.", e);
+      throw new RuntimeException(e);
+    }
   }
 
   /**
@@ -750,6 +798,13 @@ public class Compactor extends AbstractServer
 
       gcLogger.logGCInfo(getConfiguration());
       LOG.info("stop requested. exiting ... ");
+      try {
+        if (null != compactorLock) {
+          compactorLock.unlock();
+        }
+      } catch (Exception e) {
+        LOG.warn("Failed to release compactor lock", e);
+      }
     }
 
   }
diff --git 
a/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java 
b/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java
index 4cc8b85..8ae648d 100644
--- a/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java
@@ -823,7 +823,7 @@ public class ExternalCompactionIT extends 
ConfigurableMacBase {
         
getCluster().getServerContext().getAmple().getExternalCompactionFinalStates();
     while (fs.count() != 0) {
       LOG.info("Waiting for compaction completed marker to disappear");
-      UtilWaitThread.sleep(1000);
+      UtilWaitThread.sleep(500);
       fs = 
getCluster().getServerContext().getAmple().getExternalCompactionFinalStates();
     }
     try (final AccumuloClient client = 
Accumulo.newClient().from(getClientProperties()).build()) {

Reply via email to