This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch 3.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 81c7f3db483065bbaff1acf655d1eb3ad5ef55c0 Merge: c48adeaa23 b3424a86b9 Author: Dave Marion <dlmar...@apache.org> AuthorDate: Mon Jan 6 19:37:18 2025 +0000 Merge branch '2.1' into 3.1 .../accumulo/core/lock/ServiceLockSupport.java | 152 +++++++++++++++++++++ .../coordinator/CompactionCoordinator.java | 5 +- .../coordinator/CoordinatorLockWatcher.java | 94 ------------- .../org/apache/accumulo/compactor/Compactor.java | 19 +-- .../apache/accumulo/gc/SimpleGarbageCollector.java | 41 +++--- .../java/org/apache/accumulo/manager/Manager.java | 69 +--------- .../java/org/apache/accumulo/monitor/Monitor.java | 60 +------- .../org/apache/accumulo/tserver/ScanServer.java | 23 +--- .../org/apache/accumulo/tserver/TabletServer.java | 23 +--- 9 files changed, 192 insertions(+), 294 deletions(-) diff --cc core/src/main/java/org/apache/accumulo/core/lock/ServiceLockSupport.java index 0000000000,b4d95a703a..3129511b5e mode 000000,100644..100644 --- a/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockSupport.java +++ b/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockSupport.java @@@ -1,0 -1,152 +1,152 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ -package org.apache.accumulo.core.fate.zookeeper; ++package org.apache.accumulo.core.lock; + + import java.util.function.Consumer; + import java.util.function.Supplier; + -import org.apache.accumulo.core.fate.zookeeper.ServiceLock.AccumuloLockWatcher; -import org.apache.accumulo.core.fate.zookeeper.ServiceLock.LockLossReason; -import org.apache.accumulo.core.fate.zookeeper.ServiceLock.LockWatcher; ++import org.apache.accumulo.core.lock.ServiceLock.AccumuloLockWatcher; ++import org.apache.accumulo.core.lock.ServiceLock.LockLossReason; ++import org.apache.accumulo.core.lock.ServiceLock.LockWatcher; + import org.apache.accumulo.core.util.Halt; + import org.apache.zookeeper.KeeperException.NoAuthException; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + + public class ServiceLockSupport { + + /** + * Lock Watcher used by Highly Available services. These are services where only instance is + * running at a time, but another backup service can be started that will be used if the active + * service instance fails and loses its lock in ZK. + */ + public static class HAServiceLockWatcher implements AccumuloLockWatcher { + + private static final Logger LOG = LoggerFactory.getLogger(HAServiceLockWatcher.class); + + private final String serviceName; + private volatile boolean acquiredLock = false; + private volatile boolean failedToAcquireLock = false; + + public HAServiceLockWatcher(String serviceName) { + this.serviceName = serviceName; + } + + @Override + public void lostLock(LockLossReason reason) { + Halt.halt(serviceName + " lock in zookeeper lost (reason = " + reason + "), exiting!", -1); + } + + @Override + public void unableToMonitorLockNode(final Exception e) { + // ACCUMULO-3651 Changed level to error and added FATAL to message for slf4j compatibility + Halt.halt(-1, + () -> LOG.error("FATAL: No longer able to monitor {} lock node", serviceName, e)); + + } + + @Override + public synchronized void acquiredLock() { + LOG.debug("Acquired {} lock", serviceName); + + if (acquiredLock || failedToAcquireLock) { + Halt.halt("Zoolock in unexpected state AL " + acquiredLock + " " + failedToAcquireLock, -1); + } + + acquiredLock = true; + notifyAll(); + } + + @Override + public synchronized void failedToAcquireLock(Exception e) { + LOG.warn("Failed to get {} lock", serviceName, e); + + if (e instanceof NoAuthException) { + String msg = + "Failed to acquire " + serviceName + " lock due to incorrect ZooKeeper authentication."; + LOG.error("{} Ensure instance.secret is consistent across Accumulo configuration", msg, e); + Halt.halt(msg, -1); + } + + if (acquiredLock) { + Halt.halt("Zoolock in unexpected state acquiredLock true with FAL " + failedToAcquireLock, + -1); + } + + failedToAcquireLock = true; + notifyAll(); + } + + public synchronized void waitForChange() { + while (!acquiredLock && !failedToAcquireLock) { + try { + LOG.info("{} lock held by someone else, waiting for a change in state", serviceName); + wait(); + } catch (InterruptedException e) { + // empty + } + } + } + + public boolean isLockAcquired() { + return acquiredLock; + } + + public boolean isFailedToAcquireLock() { + return failedToAcquireLock; + } + + } + + /** + * Lock Watcher used by non-HA services + */ + public static class ServiceLockWatcher implements LockWatcher { + + private static final Logger LOG = LoggerFactory.getLogger(ServiceLockWatcher.class); + + private final String serviceName; + private final Supplier<Boolean> shuttingDown; + private final Consumer<String> lostLockAction; + + public ServiceLockWatcher(String serviceName, Supplier<Boolean> shuttingDown, + Consumer<String> lostLockAction) { + this.serviceName = serviceName; + this.shuttingDown = shuttingDown; + this.lostLockAction = lostLockAction; + } + + @Override + public void lostLock(final LockLossReason reason) { + Halt.halt(1, () -> { + if (!shuttingDown.get()) { + LOG.error("{} lost lock (reason = {}), exiting.", serviceName, reason); + } + lostLockAction.accept(serviceName); + }); + } + + @Override + public void unableToMonitorLockNode(final Exception e) { + Halt.halt(1, () -> LOG.error("Lost ability to monitor {} lock, exiting.", serviceName, e)); + } + + } + + } diff --cc server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java index fd4e27fc65,7d853b4808..09d1852f46 --- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java @@@ -60,10 -57,9 +60,11 @@@ import org.apache.accumulo.core.conf.Pr import org.apache.accumulo.core.data.NamespaceId; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent; -import org.apache.accumulo.core.fate.zookeeper.ServiceLock; -import org.apache.accumulo.core.fate.zookeeper.ServiceLockSupport.HAServiceLockWatcher; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; +import org.apache.accumulo.core.lock.ServiceLock; +import org.apache.accumulo.core.lock.ServiceLockData; +import org.apache.accumulo.core.lock.ServiceLockData.ThriftService; ++import org.apache.accumulo.core.lock.ServiceLockSupport.HAServiceLockWatcher; import org.apache.accumulo.core.metadata.TServerInstance; import org.apache.accumulo.core.metadata.schema.Ample; import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; @@@ -219,12 -224,11 +220,12 @@@ public class CompactionCoordinator exte ServiceLock.path(lockPath), zooLockUUID); while (true) { - CoordinatorLockWatcher coordinatorLockWatcher = new CoordinatorLockWatcher(); + HAServiceLockWatcher coordinatorLockWatcher = new HAServiceLockWatcher("coordinator"); - coordinatorLock.lock(coordinatorLockWatcher, coordinatorClientAddress.getBytes(UTF_8)); + coordinatorLock.lock(coordinatorLockWatcher, + new ServiceLockData(zooLockUUID, coordinatorClientAddress, ThriftService.COORDINATOR)); coordinatorLockWatcher.waitForChange(); - if (coordinatorLockWatcher.isAcquiredLock()) { + if (coordinatorLockWatcher.isLockAcquired()) { break; } if (!coordinatorLockWatcher.isFailedToAcquireLock()) { diff --cc server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index 3fe294ad62,f95fad41ab..3d60d89fb1 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@@ -74,15 -73,8 +74,15 @@@ import org.apache.accumulo.core.fate.zo import org.apache.accumulo.core.file.FileOperations; import org.apache.accumulo.core.file.FileSKVIterator; import org.apache.accumulo.core.iteratorsImpl.system.SystemIteratorUtil; +import org.apache.accumulo.core.lock.ServiceLock; - import org.apache.accumulo.core.lock.ServiceLock.LockLossReason; +import org.apache.accumulo.core.lock.ServiceLock.LockWatcher; +import org.apache.accumulo.core.lock.ServiceLockData; +import org.apache.accumulo.core.lock.ServiceLockData.ServiceDescriptor; +import org.apache.accumulo.core.lock.ServiceLockData.ServiceDescriptors; +import org.apache.accumulo.core.lock.ServiceLockData.ThriftService; ++import org.apache.accumulo.core.lock.ServiceLockSupport.ServiceLockWatcher; +import org.apache.accumulo.core.metadata.ReferencedTabletFile; import org.apache.accumulo.core.metadata.StoredTabletFile; -import org.apache.accumulo.core.metadata.TabletFile; import org.apache.accumulo.core.metadata.schema.DataFileValue; import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; import org.apache.accumulo.core.metadata.schema.TabletMetadata; @@@ -98,8 -90,10 +98,7 @@@ import org.apache.accumulo.core.tablets import org.apache.accumulo.core.tabletserver.thrift.TCompactionStats; import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob; import org.apache.accumulo.core.trace.TraceUtil; - import org.apache.accumulo.core.util.Halt; -import org.apache.accumulo.core.trace.thrift.TInfo; -import org.apache.accumulo.core.util.HostAndPort; -import org.apache.accumulo.core.util.ServerServices; -import org.apache.accumulo.core.util.ServerServices.Service; +import org.apache.accumulo.core.util.Timer; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil; import org.apache.accumulo.core.util.threads.ThreadPools; @@@ -273,22 -287,13 +272,10 @@@ public class Compactor extends Abstract compactorLock = new ServiceLock(getContext().getZooReaderWriter().getZooKeeper(), ServiceLock.path(zPath), compactorId); - LockWatcher lw = new LockWatcher() { - @Override - public void lostLock(final LockLossReason reason) { - Halt.halt(1, () -> { - LOG.error("Compactor lost lock (reason = {}), exiting.", reason); - getContext().getLowMemoryDetector().logGCInfo(getConfiguration()); - }); - } -- - @Override - public void unableToMonitorLockNode(final Exception e) { - Halt.halt(1, () -> LOG.error("Lost ability to monitor Compactor lock, exiting.", e)); - } - }; + LockWatcher lw = new ServiceLockWatcher("compactor", () -> false, - (name) -> gcLogger.logGCInfo(getConfiguration())); ++ (name) -> getContext().getLowMemoryDetector().logGCInfo(getConfiguration())); try { - byte[] lockContent = - new ServerServices(hostPort, Service.COMPACTOR_CLIENT).toString().getBytes(UTF_8); for (int i = 0; i < 25; i++) { zoo.putPersistentData(zPath, new byte[0], NodeExistsPolicy.SKIP); diff --cc server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java index 62c2280b84,9658a21e7a..adc15bea23 --- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java @@@ -35,17 -34,18 +35,15 @@@ import org.apache.accumulo.core.conf.Pr import org.apache.accumulo.core.gc.thrift.GCMonitorService.Iface; import org.apache.accumulo.core.gc.thrift.GCStatus; import org.apache.accumulo.core.gc.thrift.GcCycleStats; -import org.apache.accumulo.core.metadata.MetadataTable; -import org.apache.accumulo.core.metadata.RootTable; +import org.apache.accumulo.core.lock.ServiceLock; - import org.apache.accumulo.core.lock.ServiceLock.LockLossReason; - import org.apache.accumulo.core.lock.ServiceLock.LockWatcher; +import org.apache.accumulo.core.lock.ServiceLockData; +import org.apache.accumulo.core.lock.ServiceLockData.ThriftService; ++import org.apache.accumulo.core.lock.ServiceLockSupport.HAServiceLockWatcher; +import org.apache.accumulo.core.metadata.AccumuloTable; import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; import org.apache.accumulo.core.metrics.MetricsInfo; import org.apache.accumulo.core.securityImpl.thrift.TCredentials; import org.apache.accumulo.core.trace.TraceUtil; - import org.apache.accumulo.core.util.Halt; -import org.apache.accumulo.core.trace.thrift.TInfo; -import org.apache.accumulo.core.util.HostAndPort; -import org.apache.accumulo.core.util.ServerServices; -import org.apache.accumulo.core.util.ServerServices.Service; import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.gc.metrics.GcCycleMetrics; import org.apache.accumulo.gc.metrics.GcMetrics; @@@ -336,31 -363,32 +334,32 @@@ public class SimpleGarbageCollector ext private void getZooLock(HostAndPort addr) throws KeeperException, InterruptedException { var path = ServiceLock.path(getContext().getZooKeeperRoot() + Constants.ZGC_LOCK); - LockWatcher lockWatcher = new LockWatcher() { - @Override - public void lostLock(LockLossReason reason) { - Halt.halt("GC lock in zookeeper lost (reason = " + reason + "), exiting!", 1); - } + UUID zooLockUUID = UUID.randomUUID(); + gcLock = new ServiceLock(getContext().getZooReaderWriter().getZooKeeper(), path, zooLockUUID); - @Override - public void unableToMonitorLockNode(final Exception e) { - // ACCUMULO-3651 Level changed to error and FATAL added to message for slf4j compatibility - Halt.halt(-1, () -> log.error("FATAL: No longer able to monitor lock node ", e)); + while (true) { + HAServiceLockWatcher gcLockWatcher = new HAServiceLockWatcher("gc"); + gcLock.lock(gcLockWatcher, - new ServerServices(addr.toString(), Service.GC_CLIENT).toString().getBytes(UTF_8)); ++ new ServiceLockData(zooLockUUID, addr.toString(), ThriftService.GC)); + gcLockWatcher.waitForChange(); + + if (gcLockWatcher.isLockAcquired()) { + break; } - }; - UUID zooLockUUID = UUID.randomUUID(); - gcLock = new ServiceLock(getContext().getZooReaderWriter().getZooKeeper(), path, zooLockUUID); - while (true) { - if (gcLock.tryLock(lockWatcher, - new ServiceLockData(zooLockUUID, addr.toString(), ThriftService.GC))) { - log.debug("Got GC ZooKeeper lock"); - return; + if (!gcLockWatcher.isFailedToAcquireLock()) { + throw new IllegalStateException("gc lock in unknown state"); } + + gcLock.tryToCancelAsyncLockOrUnlock(); + log.debug("Failed to get GC ZooKeeper lock, will retry"); - sleepUninterruptibly(1, TimeUnit.SECONDS); + sleepUninterruptibly(1000, TimeUnit.MILLISECONDS); } + + log.info("Got GC lock."); + } private HostAndPort startStatsService() { diff --cc server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index dfb672013f,7ca35732cc..c52f6efcfb --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@@ -73,11 -75,6 +73,11 @@@ import org.apache.accumulo.core.fate.zo import org.apache.accumulo.core.fate.zookeeper.ZooUtil; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy; +import org.apache.accumulo.core.lock.ServiceLock; - import org.apache.accumulo.core.lock.ServiceLock.LockLossReason; +import org.apache.accumulo.core.lock.ServiceLock.ServiceLockPath; +import org.apache.accumulo.core.lock.ServiceLockData; +import org.apache.accumulo.core.lock.ServiceLockData.ThriftService; ++import org.apache.accumulo.core.lock.ServiceLockSupport.HAServiceLockWatcher; import org.apache.accumulo.core.manager.balancer.AssignmentParamsImpl; import org.apache.accumulo.core.manager.balancer.BalanceParamsImpl; import org.apache.accumulo.core.manager.balancer.TServerStatusImpl; @@@ -103,14 -103,11 +103,13 @@@ import org.apache.accumulo.core.spi.bal import org.apache.accumulo.core.spi.balancer.data.TServerStatus; import org.apache.accumulo.core.spi.balancer.data.TabletMigration; import org.apache.accumulo.core.spi.balancer.data.TabletServerId; -import org.apache.accumulo.core.tabletserver.thrift.TUnloadTabletGoal; +import org.apache.accumulo.core.tablet.thrift.TUnloadTabletGoal; import org.apache.accumulo.core.trace.TraceUtil; - import org.apache.accumulo.core.util.Halt; import org.apache.accumulo.core.util.Retry; +import org.apache.accumulo.core.util.Timer; import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.core.util.threads.Threads; +import org.apache.accumulo.core.util.time.SteadyTime; import org.apache.accumulo.manager.metrics.BalancerMetrics; import org.apache.accumulo.manager.metrics.ManagerMetrics; import org.apache.accumulo.manager.recovery.RecoveryManager; @@@ -1564,66 -1635,7 +1562,7 @@@ public class Manager extends AbstractSe return managerLock; } - private static class ManagerLockWatcher implements ServiceLock.AccumuloLockWatcher { - - boolean acquiredLock = false; - boolean failedToAcquireLock = false; - - @Override - public void lostLock(LockLossReason reason) { - Halt.halt("Manager lock in zookeeper lost (reason = " + reason + "), exiting!", -1); - } - - @Override - public void unableToMonitorLockNode(final Exception e) { - // ACCUMULO-3651 Changed level to error and added FATAL to message for slf4j compatibility - Halt.halt(-1, () -> log.error("FATAL: No longer able to monitor manager lock node", e)); - - } - - @Override - public synchronized void acquiredLock() { - log.debug("Acquired manager lock"); - - if (acquiredLock || failedToAcquireLock) { - Halt.halt("Zoolock in unexpected state AL " + acquiredLock + " " + failedToAcquireLock, -1); - } - - acquiredLock = true; - notifyAll(); - } - - @Override - public synchronized void failedToAcquireLock(Exception e) { - log.warn("Failed to get manager lock", e); - - if (e instanceof NoAuthException) { - String msg = "Failed to acquire manager lock due to incorrect ZooKeeper authentication."; - log.error("{} Ensure instance.secret is consistent across Accumulo configuration", msg, e); - Halt.halt(msg, -1); - } - - if (acquiredLock) { - Halt.halt("Zoolock in unexpected state acquiredLock true with FAL " + failedToAcquireLock, - -1); - } - - failedToAcquireLock = true; - notifyAll(); - } - - public synchronized void waitForChange() { - while (!acquiredLock && !failedToAcquireLock) { - try { - wait(); - } catch (InterruptedException e) { - // empty - } - } - } - } - - private void getManagerLock(final ServiceLockPath zManagerLoc) + private ServiceLockData getManagerLock(final ServiceLockPath zManagerLoc) throws KeeperException, InterruptedException { var zooKeeper = getContext().getZooReaderWriter().getZooKeeper(); log.info("trying to get manager lock"); @@@ -1638,8 -1647,8 +1577,8 @@@ managerLock = new ServiceLock(zooKeeper, zManagerLoc, zooLockUUID); while (true) { - ManagerLockWatcher managerLockWatcher = new ManagerLockWatcher(); + HAServiceLockWatcher managerLockWatcher = new HAServiceLockWatcher("manager"); - managerLock.lock(managerLockWatcher, managerClientAddress.getBytes(UTF_8)); + managerLock.lock(managerLockWatcher, sld); managerLockWatcher.waitForChange(); diff --cc server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java index 239394f6ab,17ed173714..839dfd0eae --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java @@@ -53,25 -58,23 +53,24 @@@ import org.apache.accumulo.core.fate.zo import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy; import org.apache.accumulo.core.gc.thrift.GCMonitorService; import org.apache.accumulo.core.gc.thrift.GCStatus; +import org.apache.accumulo.core.lock.ServiceLock; - import org.apache.accumulo.core.lock.ServiceLock.LockLossReason; +import org.apache.accumulo.core.lock.ServiceLockData; +import org.apache.accumulo.core.lock.ServiceLockData.ThriftService; ++import org.apache.accumulo.core.lock.ServiceLockSupport.HAServiceLockWatcher; import org.apache.accumulo.core.manager.thrift.ManagerClientService; import org.apache.accumulo.core.manager.thrift.ManagerMonitorInfo; -import org.apache.accumulo.core.master.thrift.TableInfo; -import org.apache.accumulo.core.master.thrift.TabletServerStatus; +import org.apache.accumulo.core.manager.thrift.TableInfo; +import org.apache.accumulo.core.manager.thrift.TabletServerStatus; import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; import org.apache.accumulo.core.metrics.MetricsInfo; import org.apache.accumulo.core.rpc.ThriftUtil; import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; +import org.apache.accumulo.core.tabletscan.thrift.ActiveScan; +import org.apache.accumulo.core.tabletscan.thrift.TabletScanClientService; import org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction; -import org.apache.accumulo.core.tabletserver.thrift.ActiveScan; -import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Client; -import org.apache.accumulo.core.tabletserver.thrift.TabletScanClientService; +import org.apache.accumulo.core.tabletserver.thrift.TabletServerClientService.Client; import org.apache.accumulo.core.trace.TraceUtil; - import org.apache.accumulo.core.util.Halt; -import org.apache.accumulo.core.util.HostAndPort; import org.apache.accumulo.core.util.Pair; -import org.apache.accumulo.core.util.ServerServices; -import org.apache.accumulo.core.util.ServerServices.Service; import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil; import org.apache.accumulo.core.util.threads.Threads; import org.apache.accumulo.monitor.rest.compactions.external.ExternalCompactionInfo; @@@ -754,9 -857,8 +753,9 @@@ public class Monitor extends AbstractSe monitorLock = new ServiceLock(zoo.getZooKeeper(), monitorLockPath, zooLockUUID); while (true) { - MoniterLockWatcher monitorLockWatcher = new MoniterLockWatcher(); + HAServiceLockWatcher monitorLockWatcher = new HAServiceLockWatcher("monitor"); - monitorLock.lock(monitorLockWatcher, new byte[0]); + monitorLock.lock(monitorLockWatcher, new ServiceLockData(zooLockUUID, + monitorLocation.getHost() + ":" + monitorLocation.getPort(), ThriftService.NONE)); monitorLockWatcher.waitForChange(); diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java index 7fcd59dcc9,bfbcdeed9b..8db73cb001 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java @@@ -68,30 -70,23 +68,29 @@@ import org.apache.accumulo.core.fate.zo import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.core.file.blockfile.cache.impl.BlockCacheConfiguration; +import org.apache.accumulo.core.lock.ServiceLock; - import org.apache.accumulo.core.lock.ServiceLock.LockLossReason; +import org.apache.accumulo.core.lock.ServiceLock.LockWatcher; +import org.apache.accumulo.core.lock.ServiceLockData; +import org.apache.accumulo.core.lock.ServiceLockData.ServiceDescriptor; +import org.apache.accumulo.core.lock.ServiceLockData.ServiceDescriptors; +import org.apache.accumulo.core.lock.ServiceLockData.ThriftService; ++import org.apache.accumulo.core.lock.ServiceLockSupport.ServiceLockWatcher; import org.apache.accumulo.core.metadata.ScanServerRefTabletFile; import org.apache.accumulo.core.metadata.StoredTabletFile; import org.apache.accumulo.core.metadata.schema.Ample; import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.metadata.schema.TabletsMetadata; import org.apache.accumulo.core.metrics.MetricsInfo; import org.apache.accumulo.core.securityImpl.thrift.TCredentials; -import org.apache.accumulo.core.spi.scan.ScanServerSelector; -import org.apache.accumulo.core.tabletserver.thrift.ActiveScan; +import org.apache.accumulo.core.tabletscan.thrift.ActiveScan; +import org.apache.accumulo.core.tabletscan.thrift.ScanServerBusyException; +import org.apache.accumulo.core.tabletscan.thrift.TSampleNotPresentException; +import org.apache.accumulo.core.tabletscan.thrift.TSamplerConfiguration; +import org.apache.accumulo.core.tabletscan.thrift.TabletScanClientService; +import org.apache.accumulo.core.tabletscan.thrift.TooManyFilesException; import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException; import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException; - import org.apache.accumulo.core.util.Halt; -import org.apache.accumulo.core.tabletserver.thrift.ScanServerBusyException; -import org.apache.accumulo.core.tabletserver.thrift.TSampleNotPresentException; -import org.apache.accumulo.core.tabletserver.thrift.TSamplerConfiguration; -import org.apache.accumulo.core.tabletserver.thrift.TabletScanClientService; -import org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException; -import org.apache.accumulo.core.trace.thrift.TInfo; -import org.apache.accumulo.core.util.HostAndPort; +import org.apache.accumulo.core.util.Timer; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.server.AbstractServer; @@@ -346,25 -348,13 +345,9 @@@ public class ScanServer extends Abstrac serverLockUUID = UUID.randomUUID(); scanServerLock = new ServiceLock(zoo.getZooKeeper(), zLockPath, serverLockUUID); - - LockWatcher lw = new LockWatcher() { - - @Override - public void lostLock(final LockLossReason reason) { - Halt.halt(serverStopRequested ? 0 : 1, () -> { - if (!serverStopRequested) { - LOG.error("Lost tablet server lock (reason = {}), exiting.", reason); - } - context.getLowMemoryDetector().logGCInfo(getConfiguration()); - }); - } - - @Override - public void unableToMonitorLockNode(final Exception e) { - Halt.halt(1, () -> LOG.error("Lost ability to monitor scan server lock, exiting.", e)); - } - }; + LockWatcher lw = new ServiceLockWatcher("scan server", () -> serverStopRequested, - (name) -> gcLogger.logGCInfo(getConfiguration())); ++ (name) -> context.getLowMemoryDetector().logGCInfo(getConfiguration())); - // Don't use the normal ServerServices lock content, instead put the server UUID here. - byte[] lockContent = (serverLockUUID.toString() + "," + groupName).getBytes(UTF_8); - - // wait for 120 seconds with 5 second delay for (int i = 0; i < 120 / 5; i++) { zoo.putPersistentData(zLockPath.toString(), new byte[0], NodeExistsPolicy.SKIP); diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 520e140352,3a3ad3f3b1..9d87e7fea6 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@@ -77,21 -79,14 +77,21 @@@ import org.apache.accumulo.core.fate.zo import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.core.file.blockfile.cache.impl.BlockCacheConfiguration; +import org.apache.accumulo.core.lock.ServiceLock; - import org.apache.accumulo.core.lock.ServiceLock.LockLossReason; +import org.apache.accumulo.core.lock.ServiceLock.LockWatcher; +import org.apache.accumulo.core.lock.ServiceLockData; +import org.apache.accumulo.core.lock.ServiceLockData.ServiceDescriptor; +import org.apache.accumulo.core.lock.ServiceLockData.ServiceDescriptors; +import org.apache.accumulo.core.lock.ServiceLockData.ThriftService; ++import org.apache.accumulo.core.lock.ServiceLockSupport.ServiceLockWatcher; +import org.apache.accumulo.core.manager.thrift.BulkImportState; +import org.apache.accumulo.core.manager.thrift.Compacting; import org.apache.accumulo.core.manager.thrift.ManagerClientService; -import org.apache.accumulo.core.master.thrift.BulkImportState; -import org.apache.accumulo.core.master.thrift.Compacting; -import org.apache.accumulo.core.master.thrift.TableInfo; -import org.apache.accumulo.core.master.thrift.TabletServerStatus; -import org.apache.accumulo.core.metadata.MetadataTable; -import org.apache.accumulo.core.metadata.RootTable; +import org.apache.accumulo.core.manager.thrift.TableInfo; +import org.apache.accumulo.core.manager.thrift.TabletServerStatus; +import org.apache.accumulo.core.metadata.AccumuloTable; import org.apache.accumulo.core.metadata.TServerInstance; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.metadata.schema.TabletsMetadata; import org.apache.accumulo.core.metrics.MetricsInfo; import org.apache.accumulo.core.rpc.ThriftUtil; @@@ -100,7 -95,7 +100,6 @@@ import org.apache.accumulo.core.spi.fs. import org.apache.accumulo.core.tabletserver.log.LogEntry; import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.core.util.ComparablePair; - import org.apache.accumulo.core.util.Halt; -import org.apache.accumulo.core.util.HostAndPort; import org.apache.accumulo.core.util.MapCounter; import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.Retry; @@@ -627,28 -676,13 +626,12 @@@ public class TabletServer extends Abstr throw e; } - tabletServerLock = new ServiceLock(zoo.getZooKeeper(), zLockPath, UUID.randomUUID()); + UUID tabletServerUUID = UUID.randomUUID(); + tabletServerLock = new ServiceLock(zoo.getZooKeeper(), zLockPath, tabletServerUUID); - LockWatcher lw = new LockWatcher() { - - @Override - public void lostLock(final LockLossReason reason) { - Halt.halt(serverStopRequested ? 0 : 1, () -> { - if (!serverStopRequested) { - log.error("Lost tablet server lock (reason = {}), exiting.", reason); - } - context.getLowMemoryDetector().logGCInfo(getConfiguration()); - }); - } - - @Override - public void unableToMonitorLockNode(final Exception e) { - Halt.halt(1, () -> log.error("Lost ability to monitor tablet server lock, exiting.", e)); - - } - }; + LockWatcher lw = new ServiceLockWatcher("tablet server", () -> serverStopRequested, - (name) -> gcLogger.logGCInfo(getConfiguration())); ++ (name) -> context.getLowMemoryDetector().logGCInfo(getConfiguration())); - byte[] lockContent = new ServerServices(getClientAddressString(), Service.TSERV_CLIENT) - .toString().getBytes(UTF_8); for (int i = 0; i < 120 / 5; i++) { zoo.putPersistentData(zLockPath.toString(), new byte[0], NodeExistsPolicy.SKIP);