This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new b3424a86b9 Made lock acquisition consistent (#5224)
b3424a86b9 is described below

commit b3424a86b9cc62cfba3ca5eb5f3eef249c82fa30
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Mon Jan 6 14:08:55 2025 -0500

    Made lock acquisition consistent (#5224)
    
    Modified SimpleGarbageCollector to acquire the service
    lock in the same manner that other HA services acquire
    the service lock.
    
    Created a new class called ServiceLockSupport that is
    currently used to hold LockWatcher implementations for
    HA and non-HA servers.
    
    Closes #4839
    
    
    Co-authored-by: Keith Turner <ktur...@apache.org>
---
 .../core/fate/zookeeper/ServiceLockSupport.java    | 152 +++++++++++++++++++++
 .../coordinator/CompactionCoordinator.java         |   5 +-
 .../coordinator/CoordinatorLockWatcher.java        |  94 -------------
 .../org/apache/accumulo/compactor/Compactor.java   |  18 +--
 .../apache/accumulo/gc/SimpleGarbageCollector.java |  41 +++---
 .../java/org/apache/accumulo/manager/Manager.java  |  69 +---------
 .../java/org/apache/accumulo/monitor/Monitor.java  |  60 +-------
 .../org/apache/accumulo/tserver/ScanServer.java    |  23 +---
 .../org/apache/accumulo/tserver/TabletServer.java  |  23 +---
 9 files changed, 192 insertions(+), 293 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ServiceLockSupport.java
 
b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ServiceLockSupport.java
new file mode 100644
index 0000000000..b4d95a703a
--- /dev/null
+++ 
b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ServiceLockSupport.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.fate.zookeeper;
+
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import org.apache.accumulo.core.fate.zookeeper.ServiceLock.AccumuloLockWatcher;
+import org.apache.accumulo.core.fate.zookeeper.ServiceLock.LockLossReason;
+import org.apache.accumulo.core.fate.zookeeper.ServiceLock.LockWatcher;
+import org.apache.accumulo.core.util.Halt;
+import org.apache.zookeeper.KeeperException.NoAuthException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ServiceLockSupport {
+
+  /**
+   * Lock Watcher used by Highly Available services. These are services where 
only instance is
+   * running at a time, but another backup service can be started that will be 
used if the active
+   * service instance fails and loses its lock in ZK.
+   */
+  public static class HAServiceLockWatcher implements AccumuloLockWatcher {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(HAServiceLockWatcher.class);
+
+    private final String serviceName;
+    private volatile boolean acquiredLock = false;
+    private volatile boolean failedToAcquireLock = false;
+
+    public HAServiceLockWatcher(String serviceName) {
+      this.serviceName = serviceName;
+    }
+
+    @Override
+    public void lostLock(LockLossReason reason) {
+      Halt.halt(serviceName + " lock in zookeeper lost (reason = " + reason + 
"), exiting!", -1);
+    }
+
+    @Override
+    public void unableToMonitorLockNode(final Exception e) {
+      // ACCUMULO-3651 Changed level to error and added FATAL to message for 
slf4j compatibility
+      Halt.halt(-1,
+          () -> LOG.error("FATAL: No longer able to monitor {} lock node", 
serviceName, e));
+
+    }
+
+    @Override
+    public synchronized void acquiredLock() {
+      LOG.debug("Acquired {} lock", serviceName);
+
+      if (acquiredLock || failedToAcquireLock) {
+        Halt.halt("Zoolock in unexpected state AL " + acquiredLock + " " + 
failedToAcquireLock, -1);
+      }
+
+      acquiredLock = true;
+      notifyAll();
+    }
+
+    @Override
+    public synchronized void failedToAcquireLock(Exception e) {
+      LOG.warn("Failed to get {} lock", serviceName, e);
+
+      if (e instanceof NoAuthException) {
+        String msg =
+            "Failed to acquire " + serviceName + " lock due to incorrect 
ZooKeeper authentication.";
+        LOG.error("{} Ensure instance.secret is consistent across Accumulo 
configuration", msg, e);
+        Halt.halt(msg, -1);
+      }
+
+      if (acquiredLock) {
+        Halt.halt("Zoolock in unexpected state acquiredLock true with FAL " + 
failedToAcquireLock,
+            -1);
+      }
+
+      failedToAcquireLock = true;
+      notifyAll();
+    }
+
+    public synchronized void waitForChange() {
+      while (!acquiredLock && !failedToAcquireLock) {
+        try {
+          LOG.info("{} lock held by someone else, waiting for a change in 
state", serviceName);
+          wait();
+        } catch (InterruptedException e) {
+          // empty
+        }
+      }
+    }
+
+    public boolean isLockAcquired() {
+      return acquiredLock;
+    }
+
+    public boolean isFailedToAcquireLock() {
+      return failedToAcquireLock;
+    }
+
+  }
+
+  /**
+   * Lock Watcher used by non-HA services
+   */
+  public static class ServiceLockWatcher implements LockWatcher {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ServiceLockWatcher.class);
+
+    private final String serviceName;
+    private final Supplier<Boolean> shuttingDown;
+    private final Consumer<String> lostLockAction;
+
+    public ServiceLockWatcher(String serviceName, Supplier<Boolean> 
shuttingDown,
+        Consumer<String> lostLockAction) {
+      this.serviceName = serviceName;
+      this.shuttingDown = shuttingDown;
+      this.lostLockAction = lostLockAction;
+    }
+
+    @Override
+    public void lostLock(final LockLossReason reason) {
+      Halt.halt(1, () -> {
+        if (!shuttingDown.get()) {
+          LOG.error("{} lost lock (reason = {}), exiting.", serviceName, 
reason);
+        }
+        lostLockAction.accept(serviceName);
+      });
+    }
+
+    @Override
+    public void unableToMonitorLockNode(final Exception e) {
+      Halt.halt(1, () -> LOG.error("Lost ability to monitor {} lock, 
exiting.", serviceName, e));
+    }
+
+  }
+
+}
diff --git 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
index b735d8544d..7d853b4808 100644
--- 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
+++ 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
@@ -58,6 +58,7 @@ import org.apache.accumulo.core.data.NamespaceId;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
 import org.apache.accumulo.core.fate.zookeeper.ServiceLock;
+import 
org.apache.accumulo.core.fate.zookeeper.ServiceLockSupport.HAServiceLockWatcher;
 import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
 import org.apache.accumulo.core.metadata.TServerInstance;
 import org.apache.accumulo.core.metadata.schema.Ample;
@@ -223,11 +224,11 @@ public class CompactionCoordinator extends AbstractServer
         ServiceLock.path(lockPath), zooLockUUID);
     while (true) {
 
-      CoordinatorLockWatcher coordinatorLockWatcher = new 
CoordinatorLockWatcher();
+      HAServiceLockWatcher coordinatorLockWatcher = new 
HAServiceLockWatcher("coordinator");
       coordinatorLock.lock(coordinatorLockWatcher, 
coordinatorClientAddress.getBytes(UTF_8));
 
       coordinatorLockWatcher.waitForChange();
-      if (coordinatorLockWatcher.isAcquiredLock()) {
+      if (coordinatorLockWatcher.isLockAcquired()) {
         break;
       }
       if (!coordinatorLockWatcher.isFailedToAcquireLock()) {
diff --git 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CoordinatorLockWatcher.java
 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CoordinatorLockWatcher.java
deleted file mode 100644
index 663ecaf4a9..0000000000
--- 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CoordinatorLockWatcher.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.accumulo.coordinator;
-
-import org.apache.accumulo.core.fate.zookeeper.ServiceLock;
-import org.apache.accumulo.core.fate.zookeeper.ServiceLock.LockLossReason;
-import org.apache.accumulo.core.util.Halt;
-import org.apache.zookeeper.KeeperException.NoAuthException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class CoordinatorLockWatcher implements ServiceLock.AccumuloLockWatcher 
{
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(CoordinatorLockWatcher.class);
-
-  private volatile boolean acquiredLock = false;
-  private volatile boolean failedToAcquireLock = false;
-
-  @Override
-  public void lostLock(LockLossReason reason) {
-    Halt.halt("Coordinator lock in zookeeper lost (reason = " + reason + "), 
exiting!", -1);
-  }
-
-  @Override
-  public void unableToMonitorLockNode(final Exception e) {
-    // ACCUMULO-3651 Changed level to error and added FATAL to message for 
slf4j compatibility
-    Halt.halt(-1, () -> LOG.error("FATAL: No longer able to monitor 
Coordinator lock node", e));
-
-  }
-
-  @Override
-  public synchronized void acquiredLock() {
-    LOG.debug("Acquired Coordinator lock");
-
-    if (acquiredLock || failedToAcquireLock) {
-      Halt.halt("Zoolock in unexpected state AL " + acquiredLock + " " + 
failedToAcquireLock, -1);
-    }
-
-    acquiredLock = true;
-    notifyAll();
-  }
-
-  @Override
-  public synchronized void failedToAcquireLock(Exception e) {
-    LOG.warn("Failed to get Coordinator lock", e);
-
-    if (e instanceof NoAuthException) {
-      String msg = "Failed to acquire Coordinator lock due to incorrect 
ZooKeeper authentication.";
-      LOG.error("{} Ensure instance.secret is consistent across Accumulo 
configuration", msg, e);
-      Halt.halt(msg, -1);
-    }
-
-    if (acquiredLock) {
-      Halt.halt("Zoolock in unexpected state FAL " + acquiredLock + " " + 
failedToAcquireLock, -1);
-    }
-
-    failedToAcquireLock = true;
-    notifyAll();
-  }
-
-  public synchronized void waitForChange() {
-    while (!acquiredLock && !failedToAcquireLock) {
-      try {
-        LOG.info("Coordinator lock held by someone else, waiting for a change 
in state");
-        wait();
-      } catch (InterruptedException e) {}
-    }
-  }
-
-  public boolean isAcquiredLock() {
-    return acquiredLock;
-  }
-
-  public boolean isFailedToAcquireLock() {
-    return failedToAcquireLock;
-  }
-
-}
diff --git 
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index 83ff5de7ee..f95fad41ab 100644
--- 
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++ 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@ -66,8 +66,8 @@ import org.apache.accumulo.core.data.NamespaceId;
 import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.fate.zookeeper.ServiceLock;
-import org.apache.accumulo.core.fate.zookeeper.ServiceLock.LockLossReason;
 import org.apache.accumulo.core.fate.zookeeper.ServiceLock.LockWatcher;
+import 
org.apache.accumulo.core.fate.zookeeper.ServiceLockSupport.ServiceLockWatcher;
 import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
 import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy;
 import org.apache.accumulo.core.file.FileOperations;
@@ -91,7 +91,6 @@ import 
org.apache.accumulo.core.tabletserver.thrift.TCompactionStats;
 import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob;
 import org.apache.accumulo.core.trace.TraceUtil;
 import org.apache.accumulo.core.trace.thrift.TInfo;
-import org.apache.accumulo.core.util.Halt;
 import org.apache.accumulo.core.util.HostAndPort;
 import org.apache.accumulo.core.util.ServerServices;
 import org.apache.accumulo.core.util.ServerServices.Service;
@@ -288,20 +287,9 @@ public class Compactor extends AbstractServer implements 
MetricsProducer, Compac
 
     compactorLock = new 
ServiceLock(getContext().getZooReaderWriter().getZooKeeper(),
         ServiceLock.path(zPath), compactorId);
-    LockWatcher lw = new LockWatcher() {
-      @Override
-      public void lostLock(final LockLossReason reason) {
-        Halt.halt(1, () -> {
-          LOG.error("Compactor lost lock (reason = {}), exiting.", reason);
-          gcLogger.logGCInfo(getConfiguration());
-        });
-      }
 
-      @Override
-      public void unableToMonitorLockNode(final Exception e) {
-        Halt.halt(1, () -> LOG.error("Lost ability to monitor Compactor lock, 
exiting.", e));
-      }
-    };
+    LockWatcher lw = new ServiceLockWatcher("compactor", () -> false,
+        (name) -> gcLogger.logGCInfo(getConfiguration()));
 
     try {
       byte[] lockContent =
diff --git 
a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java 
b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
index bd78388836..9658a21e7a 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
@@ -32,8 +32,7 @@ import org.apache.accumulo.core.client.AccumuloClient;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.fate.zookeeper.ServiceLock;
-import org.apache.accumulo.core.fate.zookeeper.ServiceLock.LockLossReason;
-import org.apache.accumulo.core.fate.zookeeper.ServiceLock.LockWatcher;
+import 
org.apache.accumulo.core.fate.zookeeper.ServiceLockSupport.HAServiceLockWatcher;
 import org.apache.accumulo.core.gc.thrift.GCMonitorService.Iface;
 import org.apache.accumulo.core.gc.thrift.GCStatus;
 import org.apache.accumulo.core.gc.thrift.GcCycleStats;
@@ -44,7 +43,6 @@ import org.apache.accumulo.core.metrics.MetricsInfo;
 import org.apache.accumulo.core.securityImpl.thrift.TCredentials;
 import org.apache.accumulo.core.trace.TraceUtil;
 import org.apache.accumulo.core.trace.thrift.TInfo;
-import org.apache.accumulo.core.util.Halt;
 import org.apache.accumulo.core.util.HostAndPort;
 import org.apache.accumulo.core.util.ServerServices;
 import org.apache.accumulo.core.util.ServerServices.Service;
@@ -365,31 +363,32 @@ public class SimpleGarbageCollector extends 
AbstractServer implements Iface {
   private void getZooLock(HostAndPort addr) throws KeeperException, 
InterruptedException {
     var path = ServiceLock.path(getContext().getZooKeeperRoot() + 
Constants.ZGC_LOCK);
 
-    LockWatcher lockWatcher = new LockWatcher() {
-      @Override
-      public void lostLock(LockLossReason reason) {
-        Halt.halt("GC lock in zookeeper lost (reason = " + reason + "), 
exiting!", 1);
-      }
+    UUID zooLockUUID = UUID.randomUUID();
+    gcLock = new ServiceLock(getContext().getZooReaderWriter().getZooKeeper(), 
path, zooLockUUID);
 
-      @Override
-      public void unableToMonitorLockNode(final Exception e) {
-        // ACCUMULO-3651 Level changed to error and FATAL added to message for 
slf4j compatibility
-        Halt.halt(-1, () -> log.error("FATAL: No longer able to monitor lock 
node ", e));
+    while (true) {
+      HAServiceLockWatcher gcLockWatcher = new HAServiceLockWatcher("gc");
+      gcLock.lock(gcLockWatcher,
+          new ServerServices(addr.toString(), 
Service.GC_CLIENT).toString().getBytes(UTF_8));
 
+      gcLockWatcher.waitForChange();
+
+      if (gcLockWatcher.isLockAcquired()) {
+        break;
       }
-    };
 
-    UUID zooLockUUID = UUID.randomUUID();
-    gcLock = new ServiceLock(getContext().getZooReaderWriter().getZooKeeper(), 
path, zooLockUUID);
-    while (true) {
-      if (gcLock.tryLock(lockWatcher,
-          new ServerServices(addr.toString(), 
Service.GC_CLIENT).toString().getBytes(UTF_8))) {
-        log.debug("Got GC ZooKeeper lock");
-        return;
+      if (!gcLockWatcher.isFailedToAcquireLock()) {
+        throw new IllegalStateException("gc lock in unknown state");
       }
+
+      gcLock.tryToCancelAsyncLockOrUnlock();
+
       log.debug("Failed to get GC ZooKeeper lock, will retry");
-      sleepUninterruptibly(1, TimeUnit.SECONDS);
+      sleepUninterruptibly(1000, TimeUnit.MILLISECONDS);
     }
+
+    log.info("Got GC lock.");
+
   }
 
   private HostAndPort startStatsService() {
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java 
b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index a3540fa062..7ca35732cc 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -68,8 +68,8 @@ import org.apache.accumulo.core.fate.AgeOffStore;
 import org.apache.accumulo.core.fate.Fate;
 import org.apache.accumulo.core.fate.TStore;
 import org.apache.accumulo.core.fate.zookeeper.ServiceLock;
-import org.apache.accumulo.core.fate.zookeeper.ServiceLock.LockLossReason;
 import org.apache.accumulo.core.fate.zookeeper.ServiceLock.ServiceLockPath;
+import 
org.apache.accumulo.core.fate.zookeeper.ServiceLockSupport.HAServiceLockWatcher;
 import org.apache.accumulo.core.fate.zookeeper.ZooCache.ZcStat;
 import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
 import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
@@ -105,7 +105,6 @@ import 
org.apache.accumulo.core.spi.balancer.data.TabletMigration;
 import org.apache.accumulo.core.spi.balancer.data.TabletServerId;
 import org.apache.accumulo.core.tabletserver.thrift.TUnloadTabletGoal;
 import org.apache.accumulo.core.trace.TraceUtil;
-import org.apache.accumulo.core.util.Halt;
 import org.apache.accumulo.core.util.Retry;
 import org.apache.accumulo.core.util.threads.ThreadPools;
 import org.apache.accumulo.core.util.threads.Threads;
@@ -148,7 +147,6 @@ import org.apache.thrift.TException;
 import org.apache.thrift.server.TServer;
 import org.apache.thrift.transport.TTransportException;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.NoAuthException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.slf4j.Logger;
@@ -1637,65 +1635,6 @@ public class Manager extends AbstractServer
     return managerLock;
   }
 
-  private static class ManagerLockWatcher implements 
ServiceLock.AccumuloLockWatcher {
-
-    boolean acquiredLock = false;
-    boolean failedToAcquireLock = false;
-
-    @Override
-    public void lostLock(LockLossReason reason) {
-      Halt.halt("Manager lock in zookeeper lost (reason = " + reason + "), 
exiting!", -1);
-    }
-
-    @Override
-    public void unableToMonitorLockNode(final Exception e) {
-      // ACCUMULO-3651 Changed level to error and added FATAL to message for 
slf4j compatibility
-      Halt.halt(-1, () -> log.error("FATAL: No longer able to monitor manager 
lock node", e));
-
-    }
-
-    @Override
-    public synchronized void acquiredLock() {
-      log.debug("Acquired manager lock");
-
-      if (acquiredLock || failedToAcquireLock) {
-        Halt.halt("Zoolock in unexpected state AL " + acquiredLock + " " + 
failedToAcquireLock, -1);
-      }
-
-      acquiredLock = true;
-      notifyAll();
-    }
-
-    @Override
-    public synchronized void failedToAcquireLock(Exception e) {
-      log.warn("Failed to get manager lock", e);
-
-      if (e instanceof NoAuthException) {
-        String msg = "Failed to acquire manager lock due to incorrect 
ZooKeeper authentication.";
-        log.error("{} Ensure instance.secret is consistent across Accumulo 
configuration", msg, e);
-        Halt.halt(msg, -1);
-      }
-
-      if (acquiredLock) {
-        Halt.halt("Zoolock in unexpected state acquiredLock true with FAL " + 
failedToAcquireLock,
-            -1);
-      }
-
-      failedToAcquireLock = true;
-      notifyAll();
-    }
-
-    public synchronized void waitForChange() {
-      while (!acquiredLock && !failedToAcquireLock) {
-        try {
-          wait();
-        } catch (InterruptedException e) {
-          // empty
-        }
-      }
-    }
-  }
-
   private void getManagerLock(final ServiceLockPath zManagerLoc)
       throws KeeperException, InterruptedException {
     var zooKeeper = getContext().getZooReaderWriter().getZooKeeper();
@@ -1708,17 +1647,17 @@ public class Manager extends AbstractServer
     managerLock = new ServiceLock(zooKeeper, zManagerLoc, zooLockUUID);
     while (true) {
 
-      ManagerLockWatcher managerLockWatcher = new ManagerLockWatcher();
+      HAServiceLockWatcher managerLockWatcher = new 
HAServiceLockWatcher("manager");
       managerLock.lock(managerLockWatcher, 
managerClientAddress.getBytes(UTF_8));
 
       managerLockWatcher.waitForChange();
 
-      if (managerLockWatcher.acquiredLock) {
+      if (managerLockWatcher.isLockAcquired()) {
         startServiceLockVerificationThread();
         break;
       }
 
-      if (!managerLockWatcher.failedToAcquireLock) {
+      if (!managerLockWatcher.isFailedToAcquireLock()) {
         throw new IllegalStateException("manager lock in unknown state");
       }
 
diff --git 
a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java 
b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
index 4108282bcb..17ed173714 100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
@@ -52,7 +52,7 @@ import 
org.apache.accumulo.core.compaction.thrift.TExternalCompactionList;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.fate.zookeeper.ServiceLock;
-import org.apache.accumulo.core.fate.zookeeper.ServiceLock.LockLossReason;
+import 
org.apache.accumulo.core.fate.zookeeper.ServiceLockSupport.HAServiceLockWatcher;
 import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
 import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy;
 import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy;
@@ -71,7 +71,6 @@ import 
org.apache.accumulo.core.tabletserver.thrift.ActiveScan;
 import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Client;
 import org.apache.accumulo.core.tabletserver.thrift.TabletScanClientService;
 import org.apache.accumulo.core.trace.TraceUtil;
-import org.apache.accumulo.core.util.Halt;
 import org.apache.accumulo.core.util.HostAndPort;
 import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.core.util.ServerServices;
@@ -858,16 +857,16 @@ public class Monitor extends AbstractServer implements 
HighlyAvailableService {
     monitorLock = new ServiceLock(zoo.getZooKeeper(), monitorLockPath, 
zooLockUUID);
 
     while (true) {
-      MoniterLockWatcher monitorLockWatcher = new MoniterLockWatcher();
+      HAServiceLockWatcher monitorLockWatcher = new 
HAServiceLockWatcher("monitor");
       monitorLock.lock(monitorLockWatcher, new byte[0]);
 
       monitorLockWatcher.waitForChange();
 
-      if (monitorLockWatcher.acquiredLock) {
+      if (monitorLockWatcher.isLockAcquired()) {
         break;
       }
 
-      if (!monitorLockWatcher.failedToAcquireLock) {
+      if (!monitorLockWatcher.isFailedToAcquireLock()) {
         throw new IllegalStateException("monitor lock in unknown state");
       }
 
@@ -881,57 +880,6 @@ public class Monitor extends AbstractServer implements 
HighlyAvailableService {
     log.info("Got Monitor lock.");
   }
 
-  /**
-   * Async Watcher for monitor lock
-   */
-  private static class MoniterLockWatcher implements 
ServiceLock.AccumuloLockWatcher {
-
-    boolean acquiredLock = false;
-    boolean failedToAcquireLock = false;
-
-    @Override
-    public void lostLock(LockLossReason reason) {
-      Halt.halt("Monitor lock in zookeeper lost (reason = " + reason + "), 
exiting!", -1);
-    }
-
-    @Override
-    public void unableToMonitorLockNode(final Exception e) {
-      Halt.halt(-1, () -> log.error("No longer able to monitor Monitor lock 
node", e));
-
-    }
-
-    @Override
-    public synchronized void acquiredLock() {
-      if (acquiredLock || failedToAcquireLock) {
-        Halt.halt("Zoolock in unexpected state AL " + acquiredLock + " " + 
failedToAcquireLock, -1);
-      }
-
-      acquiredLock = true;
-      notifyAll();
-    }
-
-    @Override
-    public synchronized void failedToAcquireLock(Exception e) {
-      log.warn("Failed to get monitor lock " + e);
-
-      if (acquiredLock) {
-        Halt.halt("Zoolock in unexpected state FAL " + acquiredLock + " " + 
failedToAcquireLock,
-            -1);
-      }
-
-      failedToAcquireLock = true;
-      notifyAll();
-    }
-
-    public synchronized void waitForChange() {
-      while (!acquiredLock && !failedToAcquireLock) {
-        try {
-          wait();
-        } catch (InterruptedException e) {}
-      }
-    }
-  }
-
   public ManagerMonitorInfo getMmi() {
     return mmi;
   }
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
index b3301f426c..bfbcdeed9b 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
@@ -64,8 +64,8 @@ import org.apache.accumulo.core.dataImpl.thrift.TColumn;
 import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
 import org.apache.accumulo.core.dataImpl.thrift.TRange;
 import org.apache.accumulo.core.fate.zookeeper.ServiceLock;
-import org.apache.accumulo.core.fate.zookeeper.ServiceLock.LockLossReason;
 import org.apache.accumulo.core.fate.zookeeper.ServiceLock.LockWatcher;
+import 
org.apache.accumulo.core.fate.zookeeper.ServiceLockSupport.ServiceLockWatcher;
 import org.apache.accumulo.core.fate.zookeeper.ZooCache;
 import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
 import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy;
@@ -86,7 +86,6 @@ import 
org.apache.accumulo.core.tabletserver.thrift.TSamplerConfiguration;
 import org.apache.accumulo.core.tabletserver.thrift.TabletScanClientService;
 import org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException;
 import org.apache.accumulo.core.trace.thrift.TInfo;
-import org.apache.accumulo.core.util.Halt;
 import org.apache.accumulo.core.util.HostAndPort;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.core.util.threads.ThreadPools;
@@ -349,24 +348,8 @@ public class ScanServer extends AbstractServer
 
       serverLockUUID = UUID.randomUUID();
       scanServerLock = new ServiceLock(zoo.getZooKeeper(), zLockPath, 
serverLockUUID);
-
-      LockWatcher lw = new LockWatcher() {
-
-        @Override
-        public void lostLock(final LockLossReason reason) {
-          Halt.halt(serverStopRequested ? 0 : 1, () -> {
-            if (!serverStopRequested) {
-              LOG.error("Lost tablet server lock (reason = {}), exiting.", 
reason);
-            }
-            gcLogger.logGCInfo(getConfiguration());
-          });
-        }
-
-        @Override
-        public void unableToMonitorLockNode(final Exception e) {
-          Halt.halt(1, () -> LOG.error("Lost ability to monitor scan server 
lock, exiting.", e));
-        }
-      };
+      LockWatcher lw = new ServiceLockWatcher("scan server", () -> 
serverStopRequested,
+          (name) -> gcLogger.logGCInfo(getConfiguration()));
 
       // Don't use the normal ServerServices lock content, instead put the 
server UUID here.
       byte[] lockContent = (serverLockUUID.toString() + "," + 
groupName).getBytes(UTF_8);
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 54764b8672..3a3ad3f3b1 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -73,8 +73,8 @@ import org.apache.accumulo.core.data.InstanceId;
 import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.fate.zookeeper.ServiceLock;
-import org.apache.accumulo.core.fate.zookeeper.ServiceLock.LockLossReason;
 import org.apache.accumulo.core.fate.zookeeper.ServiceLock.LockWatcher;
+import 
org.apache.accumulo.core.fate.zookeeper.ServiceLockSupport.ServiceLockWatcher;
 import org.apache.accumulo.core.fate.zookeeper.ZooCache;
 import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
 import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy;
@@ -95,7 +95,6 @@ import 
org.apache.accumulo.core.spi.fs.VolumeChooserEnvironment;
 import org.apache.accumulo.core.tabletserver.log.LogEntry;
 import org.apache.accumulo.core.trace.TraceUtil;
 import org.apache.accumulo.core.util.ComparablePair;
-import org.apache.accumulo.core.util.Halt;
 import org.apache.accumulo.core.util.HostAndPort;
 import org.apache.accumulo.core.util.MapCounter;
 import org.apache.accumulo.core.util.Pair;
@@ -679,24 +678,8 @@ public class TabletServer extends AbstractServer 
implements TabletHostingServer
 
       tabletServerLock = new ServiceLock(zoo.getZooKeeper(), zLockPath, 
UUID.randomUUID());
 
-      LockWatcher lw = new LockWatcher() {
-
-        @Override
-        public void lostLock(final LockLossReason reason) {
-          Halt.halt(serverStopRequested ? 0 : 1, () -> {
-            if (!serverStopRequested) {
-              log.error("Lost tablet server lock (reason = {}), exiting.", 
reason);
-            }
-            gcLogger.logGCInfo(getConfiguration());
-          });
-        }
-
-        @Override
-        public void unableToMonitorLockNode(final Exception e) {
-          Halt.halt(1, () -> log.error("Lost ability to monitor tablet server 
lock, exiting.", e));
-
-        }
-      };
+      LockWatcher lw = new ServiceLockWatcher("tablet server", () -> 
serverStopRequested,
+          (name) -> gcLogger.logGCInfo(getConfiguration()));
 
       byte[] lockContent = new ServerServices(getClientAddressString(), 
Service.TSERV_CLIENT)
           .toString().getBytes(UTF_8);

Reply via email to