This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 81c7f3db483065bbaff1acf655d1eb3ad5ef55c0
Merge: c48adeaa23 b3424a86b9
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Mon Jan 6 19:37:18 2025 +0000

    Merge branch '2.1' into 3.1

 .../accumulo/core/lock/ServiceLockSupport.java     | 152 +++++++++++++++++++++
 .../coordinator/CompactionCoordinator.java         |   5 +-
 .../coordinator/CoordinatorLockWatcher.java        |  94 -------------
 .../org/apache/accumulo/compactor/Compactor.java   |  19 +--
 .../apache/accumulo/gc/SimpleGarbageCollector.java |  41 +++---
 .../java/org/apache/accumulo/manager/Manager.java  |  69 +---------
 .../java/org/apache/accumulo/monitor/Monitor.java  |  60 +-------
 .../org/apache/accumulo/tserver/ScanServer.java    |  23 +---
 .../org/apache/accumulo/tserver/TabletServer.java  |  23 +---
 9 files changed, 192 insertions(+), 294 deletions(-)

diff --cc 
core/src/main/java/org/apache/accumulo/core/lock/ServiceLockSupport.java
index 0000000000,b4d95a703a..3129511b5e
mode 000000,100644..100644
--- a/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockSupport.java
+++ b/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockSupport.java
@@@ -1,0 -1,152 +1,152 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *   https://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
 -package org.apache.accumulo.core.fate.zookeeper;
++package org.apache.accumulo.core.lock;
+ 
+ import java.util.function.Consumer;
+ import java.util.function.Supplier;
+ 
 -import 
org.apache.accumulo.core.fate.zookeeper.ServiceLock.AccumuloLockWatcher;
 -import org.apache.accumulo.core.fate.zookeeper.ServiceLock.LockLossReason;
 -import org.apache.accumulo.core.fate.zookeeper.ServiceLock.LockWatcher;
++import org.apache.accumulo.core.lock.ServiceLock.AccumuloLockWatcher;
++import org.apache.accumulo.core.lock.ServiceLock.LockLossReason;
++import org.apache.accumulo.core.lock.ServiceLock.LockWatcher;
+ import org.apache.accumulo.core.util.Halt;
+ import org.apache.zookeeper.KeeperException.NoAuthException;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+ public class ServiceLockSupport {
+ 
+   /**
+    * Lock Watcher used by Highly Available services. These are services where 
only instance is
+    * running at a time, but another backup service can be started that will 
be used if the active
+    * service instance fails and loses its lock in ZK.
+    */
+   public static class HAServiceLockWatcher implements AccumuloLockWatcher {
+ 
+     private static final Logger LOG = 
LoggerFactory.getLogger(HAServiceLockWatcher.class);
+ 
+     private final String serviceName;
+     private volatile boolean acquiredLock = false;
+     private volatile boolean failedToAcquireLock = false;
+ 
+     public HAServiceLockWatcher(String serviceName) {
+       this.serviceName = serviceName;
+     }
+ 
+     @Override
+     public void lostLock(LockLossReason reason) {
+       Halt.halt(serviceName + " lock in zookeeper lost (reason = " + reason + 
"), exiting!", -1);
+     }
+ 
+     @Override
+     public void unableToMonitorLockNode(final Exception e) {
+       // ACCUMULO-3651 Changed level to error and added FATAL to message for 
slf4j compatibility
+       Halt.halt(-1,
+           () -> LOG.error("FATAL: No longer able to monitor {} lock node", 
serviceName, e));
+ 
+     }
+ 
+     @Override
+     public synchronized void acquiredLock() {
+       LOG.debug("Acquired {} lock", serviceName);
+ 
+       if (acquiredLock || failedToAcquireLock) {
+         Halt.halt("Zoolock in unexpected state AL " + acquiredLock + " " + 
failedToAcquireLock, -1);
+       }
+ 
+       acquiredLock = true;
+       notifyAll();
+     }
+ 
+     @Override
+     public synchronized void failedToAcquireLock(Exception e) {
+       LOG.warn("Failed to get {} lock", serviceName, e);
+ 
+       if (e instanceof NoAuthException) {
+         String msg =
+             "Failed to acquire " + serviceName + " lock due to incorrect 
ZooKeeper authentication.";
+         LOG.error("{} Ensure instance.secret is consistent across Accumulo 
configuration", msg, e);
+         Halt.halt(msg, -1);
+       }
+ 
+       if (acquiredLock) {
+         Halt.halt("Zoolock in unexpected state acquiredLock true with FAL " + 
failedToAcquireLock,
+             -1);
+       }
+ 
+       failedToAcquireLock = true;
+       notifyAll();
+     }
+ 
+     public synchronized void waitForChange() {
+       while (!acquiredLock && !failedToAcquireLock) {
+         try {
+           LOG.info("{} lock held by someone else, waiting for a change in 
state", serviceName);
+           wait();
+         } catch (InterruptedException e) {
+           // empty
+         }
+       }
+     }
+ 
+     public boolean isLockAcquired() {
+       return acquiredLock;
+     }
+ 
+     public boolean isFailedToAcquireLock() {
+       return failedToAcquireLock;
+     }
+ 
+   }
+ 
+   /**
+    * Lock Watcher used by non-HA services
+    */
+   public static class ServiceLockWatcher implements LockWatcher {
+ 
+     private static final Logger LOG = 
LoggerFactory.getLogger(ServiceLockWatcher.class);
+ 
+     private final String serviceName;
+     private final Supplier<Boolean> shuttingDown;
+     private final Consumer<String> lostLockAction;
+ 
+     public ServiceLockWatcher(String serviceName, Supplier<Boolean> 
shuttingDown,
+         Consumer<String> lostLockAction) {
+       this.serviceName = serviceName;
+       this.shuttingDown = shuttingDown;
+       this.lostLockAction = lostLockAction;
+     }
+ 
+     @Override
+     public void lostLock(final LockLossReason reason) {
+       Halt.halt(1, () -> {
+         if (!shuttingDown.get()) {
+           LOG.error("{} lost lock (reason = {}), exiting.", serviceName, 
reason);
+         }
+         lostLockAction.accept(serviceName);
+       });
+     }
+ 
+     @Override
+     public void unableToMonitorLockNode(final Exception e) {
+       Halt.halt(1, () -> LOG.error("Lost ability to monitor {} lock, 
exiting.", serviceName, e));
+     }
+ 
+   }
+ 
+ }
diff --cc 
server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
index fd4e27fc65,7d853b4808..09d1852f46
--- 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
+++ 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
@@@ -60,10 -57,9 +60,11 @@@ import org.apache.accumulo.core.conf.Pr
  import org.apache.accumulo.core.data.NamespaceId;
  import org.apache.accumulo.core.dataImpl.KeyExtent;
  import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
 -import org.apache.accumulo.core.fate.zookeeper.ServiceLock;
 -import 
org.apache.accumulo.core.fate.zookeeper.ServiceLockSupport.HAServiceLockWatcher;
  import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
 +import org.apache.accumulo.core.lock.ServiceLock;
 +import org.apache.accumulo.core.lock.ServiceLockData;
 +import org.apache.accumulo.core.lock.ServiceLockData.ThriftService;
++import org.apache.accumulo.core.lock.ServiceLockSupport.HAServiceLockWatcher;
  import org.apache.accumulo.core.metadata.TServerInstance;
  import org.apache.accumulo.core.metadata.schema.Ample;
  import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
@@@ -219,12 -224,11 +220,12 @@@ public class CompactionCoordinator exte
          ServiceLock.path(lockPath), zooLockUUID);
      while (true) {
  
-       CoordinatorLockWatcher coordinatorLockWatcher = new 
CoordinatorLockWatcher();
+       HAServiceLockWatcher coordinatorLockWatcher = new 
HAServiceLockWatcher("coordinator");
 -      coordinatorLock.lock(coordinatorLockWatcher, 
coordinatorClientAddress.getBytes(UTF_8));
 +      coordinatorLock.lock(coordinatorLockWatcher,
 +          new ServiceLockData(zooLockUUID, coordinatorClientAddress, 
ThriftService.COORDINATOR));
  
        coordinatorLockWatcher.waitForChange();
-       if (coordinatorLockWatcher.isAcquiredLock()) {
+       if (coordinatorLockWatcher.isLockAcquired()) {
          break;
        }
        if (!coordinatorLockWatcher.isFailedToAcquireLock()) {
diff --cc 
server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index 3fe294ad62,f95fad41ab..3d60d89fb1
--- 
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++ 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@@ -74,15 -73,8 +74,15 @@@ import org.apache.accumulo.core.fate.zo
  import org.apache.accumulo.core.file.FileOperations;
  import org.apache.accumulo.core.file.FileSKVIterator;
  import org.apache.accumulo.core.iteratorsImpl.system.SystemIteratorUtil;
 +import org.apache.accumulo.core.lock.ServiceLock;
- import org.apache.accumulo.core.lock.ServiceLock.LockLossReason;
 +import org.apache.accumulo.core.lock.ServiceLock.LockWatcher;
 +import org.apache.accumulo.core.lock.ServiceLockData;
 +import org.apache.accumulo.core.lock.ServiceLockData.ServiceDescriptor;
 +import org.apache.accumulo.core.lock.ServiceLockData.ServiceDescriptors;
 +import org.apache.accumulo.core.lock.ServiceLockData.ThriftService;
++import org.apache.accumulo.core.lock.ServiceLockSupport.ServiceLockWatcher;
 +import org.apache.accumulo.core.metadata.ReferencedTabletFile;
  import org.apache.accumulo.core.metadata.StoredTabletFile;
 -import org.apache.accumulo.core.metadata.TabletFile;
  import org.apache.accumulo.core.metadata.schema.DataFileValue;
  import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
  import org.apache.accumulo.core.metadata.schema.TabletMetadata;
@@@ -98,8 -90,10 +98,7 @@@ import org.apache.accumulo.core.tablets
  import org.apache.accumulo.core.tabletserver.thrift.TCompactionStats;
  import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob;
  import org.apache.accumulo.core.trace.TraceUtil;
- import org.apache.accumulo.core.util.Halt;
 -import org.apache.accumulo.core.trace.thrift.TInfo;
 -import org.apache.accumulo.core.util.HostAndPort;
 -import org.apache.accumulo.core.util.ServerServices;
 -import org.apache.accumulo.core.util.ServerServices.Service;
 +import org.apache.accumulo.core.util.Timer;
  import org.apache.accumulo.core.util.UtilWaitThread;
  import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil;
  import org.apache.accumulo.core.util.threads.ThreadPools;
@@@ -273,22 -287,13 +272,10 @@@ public class Compactor extends Abstract
  
      compactorLock = new 
ServiceLock(getContext().getZooReaderWriter().getZooKeeper(),
          ServiceLock.path(zPath), compactorId);
-     LockWatcher lw = new LockWatcher() {
-       @Override
-       public void lostLock(final LockLossReason reason) {
-         Halt.halt(1, () -> {
-           LOG.error("Compactor lost lock (reason = {}), exiting.", reason);
-           getContext().getLowMemoryDetector().logGCInfo(getConfiguration());
-         });
-       }
--
-       @Override
-       public void unableToMonitorLockNode(final Exception e) {
-         Halt.halt(1, () -> LOG.error("Lost ability to monitor Compactor lock, 
exiting.", e));
-       }
-     };
+     LockWatcher lw = new ServiceLockWatcher("compactor", () -> false,
 -        (name) -> gcLogger.logGCInfo(getConfiguration()));
++        (name) -> 
getContext().getLowMemoryDetector().logGCInfo(getConfiguration()));
  
      try {
 -      byte[] lockContent =
 -          new ServerServices(hostPort, 
Service.COMPACTOR_CLIENT).toString().getBytes(UTF_8);
        for (int i = 0; i < 25; i++) {
          zoo.putPersistentData(zPath, new byte[0], NodeExistsPolicy.SKIP);
  
diff --cc 
server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
index 62c2280b84,9658a21e7a..adc15bea23
--- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
@@@ -35,17 -34,18 +35,15 @@@ import org.apache.accumulo.core.conf.Pr
  import org.apache.accumulo.core.gc.thrift.GCMonitorService.Iface;
  import org.apache.accumulo.core.gc.thrift.GCStatus;
  import org.apache.accumulo.core.gc.thrift.GcCycleStats;
 -import org.apache.accumulo.core.metadata.MetadataTable;
 -import org.apache.accumulo.core.metadata.RootTable;
 +import org.apache.accumulo.core.lock.ServiceLock;
- import org.apache.accumulo.core.lock.ServiceLock.LockLossReason;
- import org.apache.accumulo.core.lock.ServiceLock.LockWatcher;
 +import org.apache.accumulo.core.lock.ServiceLockData;
 +import org.apache.accumulo.core.lock.ServiceLockData.ThriftService;
++import org.apache.accumulo.core.lock.ServiceLockSupport.HAServiceLockWatcher;
 +import org.apache.accumulo.core.metadata.AccumuloTable;
  import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
  import org.apache.accumulo.core.metrics.MetricsInfo;
  import org.apache.accumulo.core.securityImpl.thrift.TCredentials;
  import org.apache.accumulo.core.trace.TraceUtil;
- import org.apache.accumulo.core.util.Halt;
 -import org.apache.accumulo.core.trace.thrift.TInfo;
 -import org.apache.accumulo.core.util.HostAndPort;
 -import org.apache.accumulo.core.util.ServerServices;
 -import org.apache.accumulo.core.util.ServerServices.Service;
  import org.apache.accumulo.core.util.threads.ThreadPools;
  import org.apache.accumulo.gc.metrics.GcCycleMetrics;
  import org.apache.accumulo.gc.metrics.GcMetrics;
@@@ -336,31 -363,32 +334,32 @@@ public class SimpleGarbageCollector ext
    private void getZooLock(HostAndPort addr) throws KeeperException, 
InterruptedException {
      var path = ServiceLock.path(getContext().getZooKeeperRoot() + 
Constants.ZGC_LOCK);
  
-     LockWatcher lockWatcher = new LockWatcher() {
-       @Override
-       public void lostLock(LockLossReason reason) {
-         Halt.halt("GC lock in zookeeper lost (reason = " + reason + "), 
exiting!", 1);
-       }
+     UUID zooLockUUID = UUID.randomUUID();
+     gcLock = new 
ServiceLock(getContext().getZooReaderWriter().getZooKeeper(), path, 
zooLockUUID);
  
-       @Override
-       public void unableToMonitorLockNode(final Exception e) {
-         // ACCUMULO-3651 Level changed to error and FATAL added to message 
for slf4j compatibility
-         Halt.halt(-1, () -> log.error("FATAL: No longer able to monitor lock 
node ", e));
+     while (true) {
+       HAServiceLockWatcher gcLockWatcher = new HAServiceLockWatcher("gc");
+       gcLock.lock(gcLockWatcher,
 -          new ServerServices(addr.toString(), 
Service.GC_CLIENT).toString().getBytes(UTF_8));
++          new ServiceLockData(zooLockUUID, addr.toString(), 
ThriftService.GC));
  
+       gcLockWatcher.waitForChange();
+ 
+       if (gcLockWatcher.isLockAcquired()) {
+         break;
        }
-     };
  
-     UUID zooLockUUID = UUID.randomUUID();
-     gcLock = new 
ServiceLock(getContext().getZooReaderWriter().getZooKeeper(), path, 
zooLockUUID);
-     while (true) {
-       if (gcLock.tryLock(lockWatcher,
-           new ServiceLockData(zooLockUUID, addr.toString(), 
ThriftService.GC))) {
-         log.debug("Got GC ZooKeeper lock");
-         return;
+       if (!gcLockWatcher.isFailedToAcquireLock()) {
+         throw new IllegalStateException("gc lock in unknown state");
        }
+ 
+       gcLock.tryToCancelAsyncLockOrUnlock();
+ 
        log.debug("Failed to get GC ZooKeeper lock, will retry");
-       sleepUninterruptibly(1, TimeUnit.SECONDS);
+       sleepUninterruptibly(1000, TimeUnit.MILLISECONDS);
      }
+ 
+     log.info("Got GC lock.");
+ 
    }
  
    private HostAndPort startStatsService() {
diff --cc server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index dfb672013f,7ca35732cc..c52f6efcfb
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@@ -73,11 -75,6 +73,11 @@@ import org.apache.accumulo.core.fate.zo
  import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
  import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy;
  import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy;
 +import org.apache.accumulo.core.lock.ServiceLock;
- import org.apache.accumulo.core.lock.ServiceLock.LockLossReason;
 +import org.apache.accumulo.core.lock.ServiceLock.ServiceLockPath;
 +import org.apache.accumulo.core.lock.ServiceLockData;
 +import org.apache.accumulo.core.lock.ServiceLockData.ThriftService;
++import org.apache.accumulo.core.lock.ServiceLockSupport.HAServiceLockWatcher;
  import org.apache.accumulo.core.manager.balancer.AssignmentParamsImpl;
  import org.apache.accumulo.core.manager.balancer.BalanceParamsImpl;
  import org.apache.accumulo.core.manager.balancer.TServerStatusImpl;
@@@ -103,14 -103,11 +103,13 @@@ import org.apache.accumulo.core.spi.bal
  import org.apache.accumulo.core.spi.balancer.data.TServerStatus;
  import org.apache.accumulo.core.spi.balancer.data.TabletMigration;
  import org.apache.accumulo.core.spi.balancer.data.TabletServerId;
 -import org.apache.accumulo.core.tabletserver.thrift.TUnloadTabletGoal;
 +import org.apache.accumulo.core.tablet.thrift.TUnloadTabletGoal;
  import org.apache.accumulo.core.trace.TraceUtil;
- import org.apache.accumulo.core.util.Halt;
  import org.apache.accumulo.core.util.Retry;
 +import org.apache.accumulo.core.util.Timer;
  import org.apache.accumulo.core.util.threads.ThreadPools;
  import org.apache.accumulo.core.util.threads.Threads;
 +import org.apache.accumulo.core.util.time.SteadyTime;
  import org.apache.accumulo.manager.metrics.BalancerMetrics;
  import org.apache.accumulo.manager.metrics.ManagerMetrics;
  import org.apache.accumulo.manager.recovery.RecoveryManager;
@@@ -1564,66 -1635,7 +1562,7 @@@ public class Manager extends AbstractSe
      return managerLock;
    }
  
-   private static class ManagerLockWatcher implements 
ServiceLock.AccumuloLockWatcher {
- 
-     boolean acquiredLock = false;
-     boolean failedToAcquireLock = false;
- 
-     @Override
-     public void lostLock(LockLossReason reason) {
-       Halt.halt("Manager lock in zookeeper lost (reason = " + reason + "), 
exiting!", -1);
-     }
- 
-     @Override
-     public void unableToMonitorLockNode(final Exception e) {
-       // ACCUMULO-3651 Changed level to error and added FATAL to message for 
slf4j compatibility
-       Halt.halt(-1, () -> log.error("FATAL: No longer able to monitor manager 
lock node", e));
- 
-     }
- 
-     @Override
-     public synchronized void acquiredLock() {
-       log.debug("Acquired manager lock");
- 
-       if (acquiredLock || failedToAcquireLock) {
-         Halt.halt("Zoolock in unexpected state AL " + acquiredLock + " " + 
failedToAcquireLock, -1);
-       }
- 
-       acquiredLock = true;
-       notifyAll();
-     }
- 
-     @Override
-     public synchronized void failedToAcquireLock(Exception e) {
-       log.warn("Failed to get manager lock", e);
- 
-       if (e instanceof NoAuthException) {
-         String msg = "Failed to acquire manager lock due to incorrect 
ZooKeeper authentication.";
-         log.error("{} Ensure instance.secret is consistent across Accumulo 
configuration", msg, e);
-         Halt.halt(msg, -1);
-       }
- 
-       if (acquiredLock) {
-         Halt.halt("Zoolock in unexpected state acquiredLock true with FAL " + 
failedToAcquireLock,
-             -1);
-       }
- 
-       failedToAcquireLock = true;
-       notifyAll();
-     }
- 
-     public synchronized void waitForChange() {
-       while (!acquiredLock && !failedToAcquireLock) {
-         try {
-           wait();
-         } catch (InterruptedException e) {
-           // empty
-         }
-       }
-     }
-   }
- 
 -  private void getManagerLock(final ServiceLockPath zManagerLoc)
 +  private ServiceLockData getManagerLock(final ServiceLockPath zManagerLoc)
        throws KeeperException, InterruptedException {
      var zooKeeper = getContext().getZooReaderWriter().getZooKeeper();
      log.info("trying to get manager lock");
@@@ -1638,8 -1647,8 +1577,8 @@@
      managerLock = new ServiceLock(zooKeeper, zManagerLoc, zooLockUUID);
      while (true) {
  
-       ManagerLockWatcher managerLockWatcher = new ManagerLockWatcher();
+       HAServiceLockWatcher managerLockWatcher = new 
HAServiceLockWatcher("manager");
 -      managerLock.lock(managerLockWatcher, 
managerClientAddress.getBytes(UTF_8));
 +      managerLock.lock(managerLockWatcher, sld);
  
        managerLockWatcher.waitForChange();
  
diff --cc server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
index 239394f6ab,17ed173714..839dfd0eae
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
@@@ -53,25 -58,23 +53,24 @@@ import org.apache.accumulo.core.fate.zo
  import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy;
  import org.apache.accumulo.core.gc.thrift.GCMonitorService;
  import org.apache.accumulo.core.gc.thrift.GCStatus;
 +import org.apache.accumulo.core.lock.ServiceLock;
- import org.apache.accumulo.core.lock.ServiceLock.LockLossReason;
 +import org.apache.accumulo.core.lock.ServiceLockData;
 +import org.apache.accumulo.core.lock.ServiceLockData.ThriftService;
++import org.apache.accumulo.core.lock.ServiceLockSupport.HAServiceLockWatcher;
  import org.apache.accumulo.core.manager.thrift.ManagerClientService;
  import org.apache.accumulo.core.manager.thrift.ManagerMonitorInfo;
 -import org.apache.accumulo.core.master.thrift.TableInfo;
 -import org.apache.accumulo.core.master.thrift.TabletServerStatus;
 +import org.apache.accumulo.core.manager.thrift.TableInfo;
 +import org.apache.accumulo.core.manager.thrift.TabletServerStatus;
  import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
  import org.apache.accumulo.core.metrics.MetricsInfo;
  import org.apache.accumulo.core.rpc.ThriftUtil;
  import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
 +import org.apache.accumulo.core.tabletscan.thrift.ActiveScan;
 +import org.apache.accumulo.core.tabletscan.thrift.TabletScanClientService;
  import org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction;
 -import org.apache.accumulo.core.tabletserver.thrift.ActiveScan;
 -import 
org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Client;
 -import org.apache.accumulo.core.tabletserver.thrift.TabletScanClientService;
 +import 
org.apache.accumulo.core.tabletserver.thrift.TabletServerClientService.Client;
  import org.apache.accumulo.core.trace.TraceUtil;
- import org.apache.accumulo.core.util.Halt;
 -import org.apache.accumulo.core.util.HostAndPort;
  import org.apache.accumulo.core.util.Pair;
 -import org.apache.accumulo.core.util.ServerServices;
 -import org.apache.accumulo.core.util.ServerServices.Service;
  import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil;
  import org.apache.accumulo.core.util.threads.Threads;
  import 
org.apache.accumulo.monitor.rest.compactions.external.ExternalCompactionInfo;
@@@ -754,9 -857,8 +753,9 @@@ public class Monitor extends AbstractSe
      monitorLock = new ServiceLock(zoo.getZooKeeper(), monitorLockPath, 
zooLockUUID);
  
      while (true) {
-       MoniterLockWatcher monitorLockWatcher = new MoniterLockWatcher();
+       HAServiceLockWatcher monitorLockWatcher = new 
HAServiceLockWatcher("monitor");
 -      monitorLock.lock(monitorLockWatcher, new byte[0]);
 +      monitorLock.lock(monitorLockWatcher, new ServiceLockData(zooLockUUID,
 +          monitorLocation.getHost() + ":" + monitorLocation.getPort(), 
ThriftService.NONE));
  
        monitorLockWatcher.waitForChange();
  
diff --cc 
server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
index 7fcd59dcc9,bfbcdeed9b..8db73cb001
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
@@@ -68,30 -70,23 +68,29 @@@ import org.apache.accumulo.core.fate.zo
  import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
  import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy;
  import 
org.apache.accumulo.core.file.blockfile.cache.impl.BlockCacheConfiguration;
 +import org.apache.accumulo.core.lock.ServiceLock;
- import org.apache.accumulo.core.lock.ServiceLock.LockLossReason;
 +import org.apache.accumulo.core.lock.ServiceLock.LockWatcher;
 +import org.apache.accumulo.core.lock.ServiceLockData;
 +import org.apache.accumulo.core.lock.ServiceLockData.ServiceDescriptor;
 +import org.apache.accumulo.core.lock.ServiceLockData.ServiceDescriptors;
 +import org.apache.accumulo.core.lock.ServiceLockData.ThriftService;
++import org.apache.accumulo.core.lock.ServiceLockSupport.ServiceLockWatcher;
  import org.apache.accumulo.core.metadata.ScanServerRefTabletFile;
  import org.apache.accumulo.core.metadata.StoredTabletFile;
  import org.apache.accumulo.core.metadata.schema.Ample;
  import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 +import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
  import org.apache.accumulo.core.metrics.MetricsInfo;
  import org.apache.accumulo.core.securityImpl.thrift.TCredentials;
 -import org.apache.accumulo.core.spi.scan.ScanServerSelector;
 -import org.apache.accumulo.core.tabletserver.thrift.ActiveScan;
 +import org.apache.accumulo.core.tabletscan.thrift.ActiveScan;
 +import org.apache.accumulo.core.tabletscan.thrift.ScanServerBusyException;
 +import org.apache.accumulo.core.tabletscan.thrift.TSampleNotPresentException;
 +import org.apache.accumulo.core.tabletscan.thrift.TSamplerConfiguration;
 +import org.apache.accumulo.core.tabletscan.thrift.TabletScanClientService;
 +import org.apache.accumulo.core.tabletscan.thrift.TooManyFilesException;
  import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException;
  import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
- import org.apache.accumulo.core.util.Halt;
 -import org.apache.accumulo.core.tabletserver.thrift.ScanServerBusyException;
 -import 
org.apache.accumulo.core.tabletserver.thrift.TSampleNotPresentException;
 -import org.apache.accumulo.core.tabletserver.thrift.TSamplerConfiguration;
 -import org.apache.accumulo.core.tabletserver.thrift.TabletScanClientService;
 -import org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException;
 -import org.apache.accumulo.core.trace.thrift.TInfo;
 -import org.apache.accumulo.core.util.HostAndPort;
 +import org.apache.accumulo.core.util.Timer;
  import org.apache.accumulo.core.util.UtilWaitThread;
  import org.apache.accumulo.core.util.threads.ThreadPools;
  import org.apache.accumulo.server.AbstractServer;
@@@ -346,25 -348,13 +345,9 @@@ public class ScanServer extends Abstrac
  
        serverLockUUID = UUID.randomUUID();
        scanServerLock = new ServiceLock(zoo.getZooKeeper(), zLockPath, 
serverLockUUID);
- 
-       LockWatcher lw = new LockWatcher() {
- 
-         @Override
-         public void lostLock(final LockLossReason reason) {
-           Halt.halt(serverStopRequested ? 0 : 1, () -> {
-             if (!serverStopRequested) {
-               LOG.error("Lost tablet server lock (reason = {}), exiting.", 
reason);
-             }
-             context.getLowMemoryDetector().logGCInfo(getConfiguration());
-           });
-         }
- 
-         @Override
-         public void unableToMonitorLockNode(final Exception e) {
-           Halt.halt(1, () -> LOG.error("Lost ability to monitor scan server 
lock, exiting.", e));
-         }
-       };
+       LockWatcher lw = new ServiceLockWatcher("scan server", () -> 
serverStopRequested,
 -          (name) -> gcLogger.logGCInfo(getConfiguration()));
++          (name) -> 
context.getLowMemoryDetector().logGCInfo(getConfiguration()));
  
 -      // Don't use the normal ServerServices lock content, instead put the 
server UUID here.
 -      byte[] lockContent = (serverLockUUID.toString() + "," + 
groupName).getBytes(UTF_8);
 -
 -      // wait for 120 seconds with 5 second delay
        for (int i = 0; i < 120 / 5; i++) {
          zoo.putPersistentData(zLockPath.toString(), new byte[0], 
NodeExistsPolicy.SKIP);
  
diff --cc 
server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 520e140352,3a3ad3f3b1..9d87e7fea6
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@@ -77,21 -79,14 +77,21 @@@ import org.apache.accumulo.core.fate.zo
  import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
  import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy;
  import 
org.apache.accumulo.core.file.blockfile.cache.impl.BlockCacheConfiguration;
 +import org.apache.accumulo.core.lock.ServiceLock;
- import org.apache.accumulo.core.lock.ServiceLock.LockLossReason;
 +import org.apache.accumulo.core.lock.ServiceLock.LockWatcher;
 +import org.apache.accumulo.core.lock.ServiceLockData;
 +import org.apache.accumulo.core.lock.ServiceLockData.ServiceDescriptor;
 +import org.apache.accumulo.core.lock.ServiceLockData.ServiceDescriptors;
 +import org.apache.accumulo.core.lock.ServiceLockData.ThriftService;
++import org.apache.accumulo.core.lock.ServiceLockSupport.ServiceLockWatcher;
 +import org.apache.accumulo.core.manager.thrift.BulkImportState;
 +import org.apache.accumulo.core.manager.thrift.Compacting;
  import org.apache.accumulo.core.manager.thrift.ManagerClientService;
 -import org.apache.accumulo.core.master.thrift.BulkImportState;
 -import org.apache.accumulo.core.master.thrift.Compacting;
 -import org.apache.accumulo.core.master.thrift.TableInfo;
 -import org.apache.accumulo.core.master.thrift.TabletServerStatus;
 -import org.apache.accumulo.core.metadata.MetadataTable;
 -import org.apache.accumulo.core.metadata.RootTable;
 +import org.apache.accumulo.core.manager.thrift.TableInfo;
 +import org.apache.accumulo.core.manager.thrift.TabletServerStatus;
 +import org.apache.accumulo.core.metadata.AccumuloTable;
  import org.apache.accumulo.core.metadata.TServerInstance;
 +import org.apache.accumulo.core.metadata.schema.TabletMetadata;
  import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
  import org.apache.accumulo.core.metrics.MetricsInfo;
  import org.apache.accumulo.core.rpc.ThriftUtil;
@@@ -100,7 -95,7 +100,6 @@@ import org.apache.accumulo.core.spi.fs.
  import org.apache.accumulo.core.tabletserver.log.LogEntry;
  import org.apache.accumulo.core.trace.TraceUtil;
  import org.apache.accumulo.core.util.ComparablePair;
- import org.apache.accumulo.core.util.Halt;
 -import org.apache.accumulo.core.util.HostAndPort;
  import org.apache.accumulo.core.util.MapCounter;
  import org.apache.accumulo.core.util.Pair;
  import org.apache.accumulo.core.util.Retry;
@@@ -627,28 -676,13 +626,12 @@@ public class TabletServer extends Abstr
          throw e;
        }
  
 -      tabletServerLock = new ServiceLock(zoo.getZooKeeper(), zLockPath, 
UUID.randomUUID());
 +      UUID tabletServerUUID = UUID.randomUUID();
 +      tabletServerLock = new ServiceLock(zoo.getZooKeeper(), zLockPath, 
tabletServerUUID);
  
-       LockWatcher lw = new LockWatcher() {
- 
-         @Override
-         public void lostLock(final LockLossReason reason) {
-           Halt.halt(serverStopRequested ? 0 : 1, () -> {
-             if (!serverStopRequested) {
-               log.error("Lost tablet server lock (reason = {}), exiting.", 
reason);
-             }
-             context.getLowMemoryDetector().logGCInfo(getConfiguration());
-           });
-         }
- 
-         @Override
-         public void unableToMonitorLockNode(final Exception e) {
-           Halt.halt(1, () -> log.error("Lost ability to monitor tablet server 
lock, exiting.", e));
- 
-         }
-       };
+       LockWatcher lw = new ServiceLockWatcher("tablet server", () -> 
serverStopRequested,
 -          (name) -> gcLogger.logGCInfo(getConfiguration()));
++          (name) -> 
context.getLowMemoryDetector().logGCInfo(getConfiguration()));
  
 -      byte[] lockContent = new ServerServices(getClientAddressString(), 
Service.TSERV_CLIENT)
 -          .toString().getBytes(UTF_8);
        for (int i = 0; i < 120 / 5; i++) {
          zoo.putPersistentData(zLockPath.toString(), new byte[0], 
NodeExistsPolicy.SKIP);
  

Reply via email to