This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch 1451-external-compactions-feature in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/1451-external-compactions-feature by this push: new 8c52daf Revert "Modify announceExistence to use ephemeral node and no lock" but retain minor edits 8c52daf is described below commit 8c52dafa00974cd0705165dccdce58b8627dcdab Author: Dave Marion <dlmar...@apache.org> AuthorDate: Wed May 5 16:10:27 2021 +0000 Revert "Modify announceExistence to use ephemeral node and no lock" but retain minor edits This reverts commit 4296d2ab70a75ae50420c2f2c7e5a850142fd6cd. --- .../util/compaction/ExternalCompactionUtil.java | 10 +++- .../org/apache/accumulo/compactor/Compactor.java | 57 +++++++++++++++++++++- .../apache/accumulo/test/ExternalCompactionIT.java | 2 +- 3 files changed, 65 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java b/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java index 974864b..56114ef 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java @@ -100,8 +100,14 @@ public class ExternalCompactionUtil { try { List<String> compactors = zooReader.getChildren(compactorQueuesPath + "/" + queue); for (String compactor : compactors) { - LOG.debug("Found live compactor: {}", compactor); - compactAddrs.add(HostAndPort.fromString(compactor)); + // compactor is the address, we are checking to see if there is a child node which + // represents the compactor's lock as a check that it's alive. + List<String> children = + zooReader.getChildren(compactorQueuesPath + "/" + queue + "/" + compactor); + if (!children.isEmpty()) { + LOG.debug("Found live compactor {} ", compactor); + compactAddrs.add(HostAndPort.fromString(compactor)); + } } } catch (NoNodeException e) { LOG.trace("Ignoring node that went missing", e); diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index c19e1d4..b51655f 100644 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@ -19,6 +19,7 @@ package org.apache.accumulo.compactor; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly; import java.io.IOException; import java.net.UnknownHostException; @@ -63,12 +64,19 @@ import org.apache.accumulo.core.tabletserver.thrift.TCompactionStats; import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob; import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.core.trace.thrift.TInfo; +import org.apache.accumulo.core.util.Halt; import org.apache.accumulo.core.util.HostAndPort; +import org.apache.accumulo.core.util.ServerServices; +import org.apache.accumulo.core.util.ServerServices.Service; import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil; import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.core.util.threads.Threads; import org.apache.accumulo.fate.util.UtilWaitThread; +import org.apache.accumulo.fate.zookeeper.ServiceLock; +import org.apache.accumulo.fate.zookeeper.ServiceLock.LockLossReason; +import org.apache.accumulo.fate.zookeeper.ServiceLock.LockWatcher; import org.apache.accumulo.fate.zookeeper.ZooReaderWriter; +import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.server.AbstractServer; import org.apache.accumulo.server.GarbageCollectionLogger; import org.apache.accumulo.server.ServerOpts; @@ -117,6 +125,7 @@ public class Compactor extends AbstractServer protected static final CompactionJobHolder JOB_HOLDER = new CompactionJobHolder(); private final GarbageCollectionLogger gcLogger = new GarbageCollectionLogger(); + private final UUID compactorId = UUID.randomUUID(); private final AccumuloConfiguration aconf; private final String queueName; private final AtomicReference<CompactionCoordinator.Client> coordinatorClient = @@ -125,6 +134,7 @@ public class Compactor extends AbstractServer new AtomicReference<>(); private SecurityOperation security; + private ServiceLock compactorLock; private ServerAddress compactorAddress = null; // Exposed for tests @@ -241,7 +251,7 @@ public class Compactor extends AbstractServer try { zoo.mkdirs(compactorQueuePath); - zoo.putEphemeralData(zPath, new byte[] {}); + zoo.putPersistentData(zPath, new byte[] {}, NodeExistsPolicy.SKIP); } catch (KeeperException e) { if (e.code() == KeeperException.Code.NOAUTH) { LOG.error("Failed to write to ZooKeeper. Ensure that" @@ -249,6 +259,44 @@ public class Compactor extends AbstractServer } throw e; } + + compactorLock = new ServiceLock(getContext().getZooReaderWriter().getZooKeeper(), + ServiceLock.path(zPath), compactorId); + LockWatcher lw = new LockWatcher() { + @Override + public void lostLock(final LockLossReason reason) { + Halt.halt(1, () -> { + LOG.error("Compactor lost lock (reason = {}), exiting.", reason); + gcLogger.logGCInfo(getConfiguration()); + }); + } + + @Override + public void unableToMonitorLockNode(final Exception e) { + Halt.halt(1, () -> LOG.error("Lost ability to monitor Compactor lock, exiting.", e)); + } + }; + + try { + byte[] lockContent = + new ServerServices(hostPort, Service.COMPACTOR_CLIENT).toString().getBytes(UTF_8); + for (int i = 0; i < 25; i++) { + zoo.putPersistentData(zPath, new byte[0], NodeExistsPolicy.SKIP); + + if (compactorLock.tryLock(lw, lockContent)) { + LOG.debug("Obtained Compactor lock {}", compactorLock.getLockPath()); + return; + } + LOG.info("Waiting for Compactor lock"); + sleepUninterruptibly(5, TimeUnit.SECONDS); + } + String msg = "Too many retries, exiting."; + LOG.info(msg); + throw new RuntimeException(msg); + } catch (Exception e) { + LOG.info("Could not obtain tablet server lock, exiting.", e); + throw new RuntimeException(e); + } } /** @@ -750,6 +798,13 @@ public class Compactor extends AbstractServer gcLogger.logGCInfo(getConfiguration()); LOG.info("stop requested. exiting ... "); + try { + if (null != compactorLock) { + compactorLock.unlock(); + } + } catch (Exception e) { + LOG.warn("Failed to release compactor lock", e); + } } } diff --git a/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java b/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java index 4cc8b85..8ae648d 100644 --- a/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java +++ b/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java @@ -823,7 +823,7 @@ public class ExternalCompactionIT extends ConfigurableMacBase { getCluster().getServerContext().getAmple().getExternalCompactionFinalStates(); while (fs.count() != 0) { LOG.info("Waiting for compaction completed marker to disappear"); - UtilWaitThread.sleep(1000); + UtilWaitThread.sleep(500); fs = getCluster().getServerContext().getAmple().getExternalCompactionFinalStates(); } try (final AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) {