This is an automated email from the ASF dual-hosted git repository.
dlmarion pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push:
new b3424a86b9 Made lock acquisition consistent (#5224)
b3424a86b9 is described below
commit b3424a86b9cc62cfba3ca5eb5f3eef249c82fa30
Author: Dave Marion <[email protected]>
AuthorDate: Mon Jan 6 14:08:55 2025 -0500
Made lock acquisition consistent (#5224)
Modified SimpleGarbageCollector to acquire the service
lock in the same manner that other HA services acquire
the service lock.
Created a new class called ServiceLockSupport that is
currently used to hold LockWatcher implementations for
HA and non-HA servers.
Closes #4839
Co-authored-by: Keith Turner <[email protected]>
---
.../core/fate/zookeeper/ServiceLockSupport.java | 152 +++++++++++++++++++++
.../coordinator/CompactionCoordinator.java | 5 +-
.../coordinator/CoordinatorLockWatcher.java | 94 -------------
.../org/apache/accumulo/compactor/Compactor.java | 18 +--
.../apache/accumulo/gc/SimpleGarbageCollector.java | 41 +++---
.../java/org/apache/accumulo/manager/Manager.java | 69 +---------
.../java/org/apache/accumulo/monitor/Monitor.java | 60 +-------
.../org/apache/accumulo/tserver/ScanServer.java | 23 +---
.../org/apache/accumulo/tserver/TabletServer.java | 23 +---
9 files changed, 192 insertions(+), 293 deletions(-)
diff --git
a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ServiceLockSupport.java
b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ServiceLockSupport.java
new file mode 100644
index 0000000000..b4d95a703a
--- /dev/null
+++
b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ServiceLockSupport.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.fate.zookeeper;
+
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import org.apache.accumulo.core.fate.zookeeper.ServiceLock.AccumuloLockWatcher;
+import org.apache.accumulo.core.fate.zookeeper.ServiceLock.LockLossReason;
+import org.apache.accumulo.core.fate.zookeeper.ServiceLock.LockWatcher;
+import org.apache.accumulo.core.util.Halt;
+import org.apache.zookeeper.KeeperException.NoAuthException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ServiceLockSupport {
+
+ /**
+ * Lock Watcher used by Highly Available services. These are services where
only instance is
+ * running at a time, but another backup service can be started that will be
used if the active
+ * service instance fails and loses its lock in ZK.
+ */
+ public static class HAServiceLockWatcher implements AccumuloLockWatcher {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(HAServiceLockWatcher.class);
+
+ private final String serviceName;
+ private volatile boolean acquiredLock = false;
+ private volatile boolean failedToAcquireLock = false;
+
+ public HAServiceLockWatcher(String serviceName) {
+ this.serviceName = serviceName;
+ }
+
+ @Override
+ public void lostLock(LockLossReason reason) {
+ Halt.halt(serviceName + " lock in zookeeper lost (reason = " + reason +
"), exiting!", -1);
+ }
+
+ @Override
+ public void unableToMonitorLockNode(final Exception e) {
+ // ACCUMULO-3651 Changed level to error and added FATAL to message for
slf4j compatibility
+ Halt.halt(-1,
+ () -> LOG.error("FATAL: No longer able to monitor {} lock node",
serviceName, e));
+
+ }
+
+ @Override
+ public synchronized void acquiredLock() {
+ LOG.debug("Acquired {} lock", serviceName);
+
+ if (acquiredLock || failedToAcquireLock) {
+ Halt.halt("Zoolock in unexpected state AL " + acquiredLock + " " +
failedToAcquireLock, -1);
+ }
+
+ acquiredLock = true;
+ notifyAll();
+ }
+
+ @Override
+ public synchronized void failedToAcquireLock(Exception e) {
+ LOG.warn("Failed to get {} lock", serviceName, e);
+
+ if (e instanceof NoAuthException) {
+ String msg =
+ "Failed to acquire " + serviceName + " lock due to incorrect
ZooKeeper authentication.";
+ LOG.error("{} Ensure instance.secret is consistent across Accumulo
configuration", msg, e);
+ Halt.halt(msg, -1);
+ }
+
+ if (acquiredLock) {
+ Halt.halt("Zoolock in unexpected state acquiredLock true with FAL " +
failedToAcquireLock,
+ -1);
+ }
+
+ failedToAcquireLock = true;
+ notifyAll();
+ }
+
+ public synchronized void waitForChange() {
+ while (!acquiredLock && !failedToAcquireLock) {
+ try {
+ LOG.info("{} lock held by someone else, waiting for a change in
state", serviceName);
+ wait();
+ } catch (InterruptedException e) {
+ // empty
+ }
+ }
+ }
+
+ public boolean isLockAcquired() {
+ return acquiredLock;
+ }
+
+ public boolean isFailedToAcquireLock() {
+ return failedToAcquireLock;
+ }
+
+ }
+
+ /**
+ * Lock Watcher used by non-HA services
+ */
+ public static class ServiceLockWatcher implements LockWatcher {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ServiceLockWatcher.class);
+
+ private final String serviceName;
+ private final Supplier<Boolean> shuttingDown;
+ private final Consumer<String> lostLockAction;
+
+ public ServiceLockWatcher(String serviceName, Supplier<Boolean>
shuttingDown,
+ Consumer<String> lostLockAction) {
+ this.serviceName = serviceName;
+ this.shuttingDown = shuttingDown;
+ this.lostLockAction = lostLockAction;
+ }
+
+ @Override
+ public void lostLock(final LockLossReason reason) {
+ Halt.halt(1, () -> {
+ if (!shuttingDown.get()) {
+ LOG.error("{} lost lock (reason = {}), exiting.", serviceName,
reason);
+ }
+ lostLockAction.accept(serviceName);
+ });
+ }
+
+ @Override
+ public void unableToMonitorLockNode(final Exception e) {
+ Halt.halt(1, () -> LOG.error("Lost ability to monitor {} lock,
exiting.", serviceName, e));
+ }
+
+ }
+
+}
diff --git
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
index b735d8544d..7d853b4808 100644
---
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
+++
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
@@ -58,6 +58,7 @@ import org.apache.accumulo.core.data.NamespaceId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
import org.apache.accumulo.core.fate.zookeeper.ServiceLock;
+import
org.apache.accumulo.core.fate.zookeeper.ServiceLockSupport.HAServiceLockWatcher;
import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.core.metadata.TServerInstance;
import org.apache.accumulo.core.metadata.schema.Ample;
@@ -223,11 +224,11 @@ public class CompactionCoordinator extends AbstractServer
ServiceLock.path(lockPath), zooLockUUID);
while (true) {
- CoordinatorLockWatcher coordinatorLockWatcher = new
CoordinatorLockWatcher();
+ HAServiceLockWatcher coordinatorLockWatcher = new
HAServiceLockWatcher("coordinator");
coordinatorLock.lock(coordinatorLockWatcher,
coordinatorClientAddress.getBytes(UTF_8));
coordinatorLockWatcher.waitForChange();
- if (coordinatorLockWatcher.isAcquiredLock()) {
+ if (coordinatorLockWatcher.isLockAcquired()) {
break;
}
if (!coordinatorLockWatcher.isFailedToAcquireLock()) {
diff --git
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CoordinatorLockWatcher.java
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CoordinatorLockWatcher.java
deleted file mode 100644
index 663ecaf4a9..0000000000
---
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CoordinatorLockWatcher.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.accumulo.coordinator;
-
-import org.apache.accumulo.core.fate.zookeeper.ServiceLock;
-import org.apache.accumulo.core.fate.zookeeper.ServiceLock.LockLossReason;
-import org.apache.accumulo.core.util.Halt;
-import org.apache.zookeeper.KeeperException.NoAuthException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class CoordinatorLockWatcher implements ServiceLock.AccumuloLockWatcher
{
-
- private static final Logger LOG =
LoggerFactory.getLogger(CoordinatorLockWatcher.class);
-
- private volatile boolean acquiredLock = false;
- private volatile boolean failedToAcquireLock = false;
-
- @Override
- public void lostLock(LockLossReason reason) {
- Halt.halt("Coordinator lock in zookeeper lost (reason = " + reason + "),
exiting!", -1);
- }
-
- @Override
- public void unableToMonitorLockNode(final Exception e) {
- // ACCUMULO-3651 Changed level to error and added FATAL to message for
slf4j compatibility
- Halt.halt(-1, () -> LOG.error("FATAL: No longer able to monitor
Coordinator lock node", e));
-
- }
-
- @Override
- public synchronized void acquiredLock() {
- LOG.debug("Acquired Coordinator lock");
-
- if (acquiredLock || failedToAcquireLock) {
- Halt.halt("Zoolock in unexpected state AL " + acquiredLock + " " +
failedToAcquireLock, -1);
- }
-
- acquiredLock = true;
- notifyAll();
- }
-
- @Override
- public synchronized void failedToAcquireLock(Exception e) {
- LOG.warn("Failed to get Coordinator lock", e);
-
- if (e instanceof NoAuthException) {
- String msg = "Failed to acquire Coordinator lock due to incorrect
ZooKeeper authentication.";
- LOG.error("{} Ensure instance.secret is consistent across Accumulo
configuration", msg, e);
- Halt.halt(msg, -1);
- }
-
- if (acquiredLock) {
- Halt.halt("Zoolock in unexpected state FAL " + acquiredLock + " " +
failedToAcquireLock, -1);
- }
-
- failedToAcquireLock = true;
- notifyAll();
- }
-
- public synchronized void waitForChange() {
- while (!acquiredLock && !failedToAcquireLock) {
- try {
- LOG.info("Coordinator lock held by someone else, waiting for a change
in state");
- wait();
- } catch (InterruptedException e) {}
- }
- }
-
- public boolean isAcquiredLock() {
- return acquiredLock;
- }
-
- public boolean isFailedToAcquireLock() {
- return failedToAcquireLock;
- }
-
-}
diff --git
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index 83ff5de7ee..f95fad41ab 100644
---
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@ -66,8 +66,8 @@ import org.apache.accumulo.core.data.NamespaceId;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.fate.zookeeper.ServiceLock;
-import org.apache.accumulo.core.fate.zookeeper.ServiceLock.LockLossReason;
import org.apache.accumulo.core.fate.zookeeper.ServiceLock.LockWatcher;
+import
org.apache.accumulo.core.fate.zookeeper.ServiceLockSupport.ServiceLockWatcher;
import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy;
import org.apache.accumulo.core.file.FileOperations;
@@ -91,7 +91,6 @@ import
org.apache.accumulo.core.tabletserver.thrift.TCompactionStats;
import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob;
import org.apache.accumulo.core.trace.TraceUtil;
import org.apache.accumulo.core.trace.thrift.TInfo;
-import org.apache.accumulo.core.util.Halt;
import org.apache.accumulo.core.util.HostAndPort;
import org.apache.accumulo.core.util.ServerServices;
import org.apache.accumulo.core.util.ServerServices.Service;
@@ -288,20 +287,9 @@ public class Compactor extends AbstractServer implements
MetricsProducer, Compac
compactorLock = new
ServiceLock(getContext().getZooReaderWriter().getZooKeeper(),
ServiceLock.path(zPath), compactorId);
- LockWatcher lw = new LockWatcher() {
- @Override
- public void lostLock(final LockLossReason reason) {
- Halt.halt(1, () -> {
- LOG.error("Compactor lost lock (reason = {}), exiting.", reason);
- gcLogger.logGCInfo(getConfiguration());
- });
- }
- @Override
- public void unableToMonitorLockNode(final Exception e) {
- Halt.halt(1, () -> LOG.error("Lost ability to monitor Compactor lock,
exiting.", e));
- }
- };
+ LockWatcher lw = new ServiceLockWatcher("compactor", () -> false,
+ (name) -> gcLogger.logGCInfo(getConfiguration()));
try {
byte[] lockContent =
diff --git
a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
index bd78388836..9658a21e7a 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
@@ -32,8 +32,7 @@ import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.fate.zookeeper.ServiceLock;
-import org.apache.accumulo.core.fate.zookeeper.ServiceLock.LockLossReason;
-import org.apache.accumulo.core.fate.zookeeper.ServiceLock.LockWatcher;
+import
org.apache.accumulo.core.fate.zookeeper.ServiceLockSupport.HAServiceLockWatcher;
import org.apache.accumulo.core.gc.thrift.GCMonitorService.Iface;
import org.apache.accumulo.core.gc.thrift.GCStatus;
import org.apache.accumulo.core.gc.thrift.GcCycleStats;
@@ -44,7 +43,6 @@ import org.apache.accumulo.core.metrics.MetricsInfo;
import org.apache.accumulo.core.securityImpl.thrift.TCredentials;
import org.apache.accumulo.core.trace.TraceUtil;
import org.apache.accumulo.core.trace.thrift.TInfo;
-import org.apache.accumulo.core.util.Halt;
import org.apache.accumulo.core.util.HostAndPort;
import org.apache.accumulo.core.util.ServerServices;
import org.apache.accumulo.core.util.ServerServices.Service;
@@ -365,31 +363,32 @@ public class SimpleGarbageCollector extends
AbstractServer implements Iface {
private void getZooLock(HostAndPort addr) throws KeeperException,
InterruptedException {
var path = ServiceLock.path(getContext().getZooKeeperRoot() +
Constants.ZGC_LOCK);
- LockWatcher lockWatcher = new LockWatcher() {
- @Override
- public void lostLock(LockLossReason reason) {
- Halt.halt("GC lock in zookeeper lost (reason = " + reason + "),
exiting!", 1);
- }
+ UUID zooLockUUID = UUID.randomUUID();
+ gcLock = new ServiceLock(getContext().getZooReaderWriter().getZooKeeper(),
path, zooLockUUID);
- @Override
- public void unableToMonitorLockNode(final Exception e) {
- // ACCUMULO-3651 Level changed to error and FATAL added to message for
slf4j compatibility
- Halt.halt(-1, () -> log.error("FATAL: No longer able to monitor lock
node ", e));
+ while (true) {
+ HAServiceLockWatcher gcLockWatcher = new HAServiceLockWatcher("gc");
+ gcLock.lock(gcLockWatcher,
+ new ServerServices(addr.toString(),
Service.GC_CLIENT).toString().getBytes(UTF_8));
+ gcLockWatcher.waitForChange();
+
+ if (gcLockWatcher.isLockAcquired()) {
+ break;
}
- };
- UUID zooLockUUID = UUID.randomUUID();
- gcLock = new ServiceLock(getContext().getZooReaderWriter().getZooKeeper(),
path, zooLockUUID);
- while (true) {
- if (gcLock.tryLock(lockWatcher,
- new ServerServices(addr.toString(),
Service.GC_CLIENT).toString().getBytes(UTF_8))) {
- log.debug("Got GC ZooKeeper lock");
- return;
+ if (!gcLockWatcher.isFailedToAcquireLock()) {
+ throw new IllegalStateException("gc lock in unknown state");
}
+
+ gcLock.tryToCancelAsyncLockOrUnlock();
+
log.debug("Failed to get GC ZooKeeper lock, will retry");
- sleepUninterruptibly(1, TimeUnit.SECONDS);
+ sleepUninterruptibly(1000, TimeUnit.MILLISECONDS);
}
+
+ log.info("Got GC lock.");
+
}
private HostAndPort startStatsService() {
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index a3540fa062..7ca35732cc 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -68,8 +68,8 @@ import org.apache.accumulo.core.fate.AgeOffStore;
import org.apache.accumulo.core.fate.Fate;
import org.apache.accumulo.core.fate.TStore;
import org.apache.accumulo.core.fate.zookeeper.ServiceLock;
-import org.apache.accumulo.core.fate.zookeeper.ServiceLock.LockLossReason;
import org.apache.accumulo.core.fate.zookeeper.ServiceLock.ServiceLockPath;
+import
org.apache.accumulo.core.fate.zookeeper.ServiceLockSupport.HAServiceLockWatcher;
import org.apache.accumulo.core.fate.zookeeper.ZooCache.ZcStat;
import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
@@ -105,7 +105,6 @@ import
org.apache.accumulo.core.spi.balancer.data.TabletMigration;
import org.apache.accumulo.core.spi.balancer.data.TabletServerId;
import org.apache.accumulo.core.tabletserver.thrift.TUnloadTabletGoal;
import org.apache.accumulo.core.trace.TraceUtil;
-import org.apache.accumulo.core.util.Halt;
import org.apache.accumulo.core.util.Retry;
import org.apache.accumulo.core.util.threads.ThreadPools;
import org.apache.accumulo.core.util.threads.Threads;
@@ -148,7 +147,6 @@ import org.apache.thrift.TException;
import org.apache.thrift.server.TServer;
import org.apache.thrift.transport.TTransportException;
import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.NoAuthException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
@@ -1637,65 +1635,6 @@ public class Manager extends AbstractServer
return managerLock;
}
- private static class ManagerLockWatcher implements
ServiceLock.AccumuloLockWatcher {
-
- boolean acquiredLock = false;
- boolean failedToAcquireLock = false;
-
- @Override
- public void lostLock(LockLossReason reason) {
- Halt.halt("Manager lock in zookeeper lost (reason = " + reason + "),
exiting!", -1);
- }
-
- @Override
- public void unableToMonitorLockNode(final Exception e) {
- // ACCUMULO-3651 Changed level to error and added FATAL to message for
slf4j compatibility
- Halt.halt(-1, () -> log.error("FATAL: No longer able to monitor manager
lock node", e));
-
- }
-
- @Override
- public synchronized void acquiredLock() {
- log.debug("Acquired manager lock");
-
- if (acquiredLock || failedToAcquireLock) {
- Halt.halt("Zoolock in unexpected state AL " + acquiredLock + " " +
failedToAcquireLock, -1);
- }
-
- acquiredLock = true;
- notifyAll();
- }
-
- @Override
- public synchronized void failedToAcquireLock(Exception e) {
- log.warn("Failed to get manager lock", e);
-
- if (e instanceof NoAuthException) {
- String msg = "Failed to acquire manager lock due to incorrect
ZooKeeper authentication.";
- log.error("{} Ensure instance.secret is consistent across Accumulo
configuration", msg, e);
- Halt.halt(msg, -1);
- }
-
- if (acquiredLock) {
- Halt.halt("Zoolock in unexpected state acquiredLock true with FAL " +
failedToAcquireLock,
- -1);
- }
-
- failedToAcquireLock = true;
- notifyAll();
- }
-
- public synchronized void waitForChange() {
- while (!acquiredLock && !failedToAcquireLock) {
- try {
- wait();
- } catch (InterruptedException e) {
- // empty
- }
- }
- }
- }
-
private void getManagerLock(final ServiceLockPath zManagerLoc)
throws KeeperException, InterruptedException {
var zooKeeper = getContext().getZooReaderWriter().getZooKeeper();
@@ -1708,17 +1647,17 @@ public class Manager extends AbstractServer
managerLock = new ServiceLock(zooKeeper, zManagerLoc, zooLockUUID);
while (true) {
- ManagerLockWatcher managerLockWatcher = new ManagerLockWatcher();
+ HAServiceLockWatcher managerLockWatcher = new
HAServiceLockWatcher("manager");
managerLock.lock(managerLockWatcher,
managerClientAddress.getBytes(UTF_8));
managerLockWatcher.waitForChange();
- if (managerLockWatcher.acquiredLock) {
+ if (managerLockWatcher.isLockAcquired()) {
startServiceLockVerificationThread();
break;
}
- if (!managerLockWatcher.failedToAcquireLock) {
+ if (!managerLockWatcher.isFailedToAcquireLock()) {
throw new IllegalStateException("manager lock in unknown state");
}
diff --git
a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
index 4108282bcb..17ed173714 100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
@@ -52,7 +52,7 @@ import
org.apache.accumulo.core.compaction.thrift.TExternalCompactionList;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.fate.zookeeper.ServiceLock;
-import org.apache.accumulo.core.fate.zookeeper.ServiceLock.LockLossReason;
+import
org.apache.accumulo.core.fate.zookeeper.ServiceLockSupport.HAServiceLockWatcher;
import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy;
@@ -71,7 +71,6 @@ import
org.apache.accumulo.core.tabletserver.thrift.ActiveScan;
import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Client;
import org.apache.accumulo.core.tabletserver.thrift.TabletScanClientService;
import org.apache.accumulo.core.trace.TraceUtil;
-import org.apache.accumulo.core.util.Halt;
import org.apache.accumulo.core.util.HostAndPort;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.ServerServices;
@@ -858,16 +857,16 @@ public class Monitor extends AbstractServer implements
HighlyAvailableService {
monitorLock = new ServiceLock(zoo.getZooKeeper(), monitorLockPath,
zooLockUUID);
while (true) {
- MoniterLockWatcher monitorLockWatcher = new MoniterLockWatcher();
+ HAServiceLockWatcher monitorLockWatcher = new
HAServiceLockWatcher("monitor");
monitorLock.lock(monitorLockWatcher, new byte[0]);
monitorLockWatcher.waitForChange();
- if (monitorLockWatcher.acquiredLock) {
+ if (monitorLockWatcher.isLockAcquired()) {
break;
}
- if (!monitorLockWatcher.failedToAcquireLock) {
+ if (!monitorLockWatcher.isFailedToAcquireLock()) {
throw new IllegalStateException("monitor lock in unknown state");
}
@@ -881,57 +880,6 @@ public class Monitor extends AbstractServer implements
HighlyAvailableService {
log.info("Got Monitor lock.");
}
- /**
- * Async Watcher for monitor lock
- */
- private static class MoniterLockWatcher implements
ServiceLock.AccumuloLockWatcher {
-
- boolean acquiredLock = false;
- boolean failedToAcquireLock = false;
-
- @Override
- public void lostLock(LockLossReason reason) {
- Halt.halt("Monitor lock in zookeeper lost (reason = " + reason + "),
exiting!", -1);
- }
-
- @Override
- public void unableToMonitorLockNode(final Exception e) {
- Halt.halt(-1, () -> log.error("No longer able to monitor Monitor lock
node", e));
-
- }
-
- @Override
- public synchronized void acquiredLock() {
- if (acquiredLock || failedToAcquireLock) {
- Halt.halt("Zoolock in unexpected state AL " + acquiredLock + " " +
failedToAcquireLock, -1);
- }
-
- acquiredLock = true;
- notifyAll();
- }
-
- @Override
- public synchronized void failedToAcquireLock(Exception e) {
- log.warn("Failed to get monitor lock " + e);
-
- if (acquiredLock) {
- Halt.halt("Zoolock in unexpected state FAL " + acquiredLock + " " +
failedToAcquireLock,
- -1);
- }
-
- failedToAcquireLock = true;
- notifyAll();
- }
-
- public synchronized void waitForChange() {
- while (!acquiredLock && !failedToAcquireLock) {
- try {
- wait();
- } catch (InterruptedException e) {}
- }
- }
- }
-
public ManagerMonitorInfo getMmi() {
return mmi;
}
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
index b3301f426c..bfbcdeed9b 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
@@ -64,8 +64,8 @@ import org.apache.accumulo.core.dataImpl.thrift.TColumn;
import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
import org.apache.accumulo.core.dataImpl.thrift.TRange;
import org.apache.accumulo.core.fate.zookeeper.ServiceLock;
-import org.apache.accumulo.core.fate.zookeeper.ServiceLock.LockLossReason;
import org.apache.accumulo.core.fate.zookeeper.ServiceLock.LockWatcher;
+import
org.apache.accumulo.core.fate.zookeeper.ServiceLockSupport.ServiceLockWatcher;
import org.apache.accumulo.core.fate.zookeeper.ZooCache;
import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy;
@@ -86,7 +86,6 @@ import
org.apache.accumulo.core.tabletserver.thrift.TSamplerConfiguration;
import org.apache.accumulo.core.tabletserver.thrift.TabletScanClientService;
import org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException;
import org.apache.accumulo.core.trace.thrift.TInfo;
-import org.apache.accumulo.core.util.Halt;
import org.apache.accumulo.core.util.HostAndPort;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.core.util.threads.ThreadPools;
@@ -349,24 +348,8 @@ public class ScanServer extends AbstractServer
serverLockUUID = UUID.randomUUID();
scanServerLock = new ServiceLock(zoo.getZooKeeper(), zLockPath,
serverLockUUID);
-
- LockWatcher lw = new LockWatcher() {
-
- @Override
- public void lostLock(final LockLossReason reason) {
- Halt.halt(serverStopRequested ? 0 : 1, () -> {
- if (!serverStopRequested) {
- LOG.error("Lost tablet server lock (reason = {}), exiting.",
reason);
- }
- gcLogger.logGCInfo(getConfiguration());
- });
- }
-
- @Override
- public void unableToMonitorLockNode(final Exception e) {
- Halt.halt(1, () -> LOG.error("Lost ability to monitor scan server
lock, exiting.", e));
- }
- };
+ LockWatcher lw = new ServiceLockWatcher("scan server", () ->
serverStopRequested,
+ (name) -> gcLogger.logGCInfo(getConfiguration()));
// Don't use the normal ServerServices lock content, instead put the
server UUID here.
byte[] lockContent = (serverLockUUID.toString() + "," +
groupName).getBytes(UTF_8);
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 54764b8672..3a3ad3f3b1 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -73,8 +73,8 @@ import org.apache.accumulo.core.data.InstanceId;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.fate.zookeeper.ServiceLock;
-import org.apache.accumulo.core.fate.zookeeper.ServiceLock.LockLossReason;
import org.apache.accumulo.core.fate.zookeeper.ServiceLock.LockWatcher;
+import
org.apache.accumulo.core.fate.zookeeper.ServiceLockSupport.ServiceLockWatcher;
import org.apache.accumulo.core.fate.zookeeper.ZooCache;
import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy;
@@ -95,7 +95,6 @@ import
org.apache.accumulo.core.spi.fs.VolumeChooserEnvironment;
import org.apache.accumulo.core.tabletserver.log.LogEntry;
import org.apache.accumulo.core.trace.TraceUtil;
import org.apache.accumulo.core.util.ComparablePair;
-import org.apache.accumulo.core.util.Halt;
import org.apache.accumulo.core.util.HostAndPort;
import org.apache.accumulo.core.util.MapCounter;
import org.apache.accumulo.core.util.Pair;
@@ -679,24 +678,8 @@ public class TabletServer extends AbstractServer
implements TabletHostingServer
tabletServerLock = new ServiceLock(zoo.getZooKeeper(), zLockPath,
UUID.randomUUID());
- LockWatcher lw = new LockWatcher() {
-
- @Override
- public void lostLock(final LockLossReason reason) {
- Halt.halt(serverStopRequested ? 0 : 1, () -> {
- if (!serverStopRequested) {
- log.error("Lost tablet server lock (reason = {}), exiting.",
reason);
- }
- gcLogger.logGCInfo(getConfiguration());
- });
- }
-
- @Override
- public void unableToMonitorLockNode(final Exception e) {
- Halt.halt(1, () -> log.error("Lost ability to monitor tablet server
lock, exiting.", e));
-
- }
- };
+ LockWatcher lw = new ServiceLockWatcher("tablet server", () ->
serverStopRequested,
+ (name) -> gcLogger.logGCInfo(getConfiguration()));
byte[] lockContent = new ServerServices(getClientAddressString(),
Service.TSERV_CLIENT)
.toString().getBytes(UTF_8);