This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit c4385208de898278460b8f9063d0245b89d38dcf Merge: 4777675d7e 9fee5991fb Author: Dave Marion <dlmar...@apache.org> AuthorDate: Fri Jan 31 19:53:34 2025 +0000 Merge branch '3.1' .../accumulo/core/lock/ServiceLockSupport.java | 30 +- .../accumulo/core/rpc/clients/ManagerClient.java | 10 +- ....java => ServerProcessServiceThriftClient.java} | 35 +- .../core/rpc/clients/ThriftClientTypes.java | 3 + .../core/util/threads/ThreadPoolNames.java | 1 + core/src/main/scripts/generate-thrift.sh | 2 +- core/src/main/spotbugs/exclude-filter.xml | 1 + .../core/manager/thrift/ManagerClientService.java | 1271 ++++++++++++++++++++ .../core/process/thrift/ServerProcessService.java | 619 ++++++++++ core/src/main/thrift/manager.thrift | 9 + core/src/main/thrift/process.thrift | 31 + .../MiniAccumuloClusterControl.java | 64 + .../org/apache/accumulo/server/AbstractServer.java | 66 +- .../accumulo/server/manager/LiveTServerSet.java | 22 +- .../accumulo/server/rpc/ThriftProcessorTypes.java | 35 +- .../org/apache/accumulo/server/util/Admin.java | 33 + .../server/zookeeper/DistributedWorkQueue.java | 26 +- .../org/apache/accumulo/compactor/Compactor.java | 337 +++--- .../apache/accumulo/gc/SimpleGarbageCollector.java | 255 ++-- .../java/org/apache/accumulo/manager/Manager.java | 32 +- .../manager/ManagerClientServiceHandler.java | 12 + .../accumulo/manager/recovery/RecoveryManager.java | 4 +- .../apache/accumulo/monitor/EmbeddedWebServer.java | 2 +- .../java/org/apache/accumulo/monitor/Monitor.java | 19 +- .../org/apache/accumulo/tserver/ScanServer.java | 56 +- .../org/apache/accumulo/tserver/TabletServer.java | 116 +- .../org/apache/accumulo/tserver/log/LogSorter.java | 17 +- .../apache/accumulo/tserver/ScanServerTest.java | 5 + .../tserver/log/RecoveryLogsIteratorTest.java | 11 +- .../tserver/log/SortedLogRecoveryTest.java | 48 +- .../tserver/log/TestUpgradePathForWALogs.java | 17 +- .../accumulo/test/SelfStoppingScanServer.java | 2 +- .../compaction/ExternalDoNothingCompactor.java | 2 +- .../accumulo/test/fate/FateExecutionOrderIT.java | 1 + .../test/functional/GracefulShutdownIT.java | 295 +++++ 35 files changed, 3066 insertions(+), 423 deletions(-) diff --cc core/src/main/java/org/apache/accumulo/core/lock/ServiceLockSupport.java index dda14f50c0,fd7a5bcd85..d4e0cd0b58 --- a/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockSupport.java +++ b/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockSupport.java @@@ -66,17 -40,24 +66,24 @@@ public class ServiceLockSupport private static final Logger LOG = LoggerFactory.getLogger(HAServiceLockWatcher.class); - private final String serviceName; + private final Type server; + private final Supplier<Boolean> shutdownComplete; private volatile boolean acquiredLock = false; private volatile boolean failedToAcquireLock = false; - public HAServiceLockWatcher(Type server) { - public HAServiceLockWatcher(String serviceName, Supplier<Boolean> shutdownComplete) { - this.serviceName = serviceName; ++ public HAServiceLockWatcher(Type server, Supplier<Boolean> shutdownComplete) { + this.server = server; + this.shutdownComplete = shutdownComplete; } @Override public void lostLock(LockLossReason reason) { - Halt.halt(server + " lock in zookeeper lost (reason = " + reason + "), exiting!", -1); + if (shutdownComplete.get()) { - LOG.warn("{} lost lock (reason = {}), not halting because shutdown is complete.", - serviceName, reason); ++ LOG.warn("{} lost lock (reason = {}), not halting because shutdown is complete.", server, ++ reason); + } else { - Halt.halt(serviceName + " lock in zookeeper lost (reason = " + reason + "), exiting!", -1); ++ Halt.halt(server + " lock in zookeeper lost (reason = " + reason + "), exiting!", -1); + } } @Override @@@ -146,25 -128,28 +153,28 @@@ private static final Logger LOG = LoggerFactory.getLogger(ServiceLockWatcher.class); - private final String serviceName; + private final Type server; - private final Supplier<Boolean> shuttingDown; + private final Supplier<Boolean> shutdownComplete; - private final Consumer<String> lostLockAction; + private final Consumer<Type> lostLockAction; - public ServiceLockWatcher(Type server, Supplier<Boolean> shuttingDown, - public ServiceLockWatcher(String serviceName, Supplier<Boolean> shutdownComplete, - Consumer<String> lostLockAction) { - this.serviceName = serviceName; ++ public ServiceLockWatcher(Type server, Supplier<Boolean> shutdownComplete, + Consumer<Type> lostLockAction) { + this.server = server; - this.shuttingDown = shuttingDown; + this.shutdownComplete = shutdownComplete; this.lostLockAction = lostLockAction; } @Override public void lostLock(final LockLossReason reason) { - Halt.halt(1, () -> { - if (!shuttingDown.get()) { + if (shutdownComplete.get()) { - LOG.warn("{} lost lock (reason = {}), not halting because shutdown is complete.", - serviceName, reason); ++ LOG.warn("{} lost lock (reason = {}), not halting because shutdown is complete.", server, ++ reason); + } else { + Halt.halt(1, () -> { - LOG.error("{} lost lock (reason = {}), exiting.", serviceName, reason); - lostLockAction.accept(serviceName); + LOG.error("{} lost lock (reason = {}), exiting.", server, reason); - } - lostLockAction.accept(server); - }); ++ lostLockAction.accept(server); + }); + } } @Override diff --cc core/src/main/java/org/apache/accumulo/core/rpc/clients/ManagerClient.java index 29057cf3cb,d0076e69f1..5127807e0b --- a/core/src/main/java/org/apache/accumulo/core/rpc/clients/ManagerClient.java +++ b/core/src/main/java/org/apache/accumulo/core/rpc/clients/ManagerClient.java @@@ -44,7 -43,11 +44,15 @@@ public interface ManagerClient<C extend return null; } - HostAndPort manager = HostAndPort.fromString(managers.iterator().next().toHostPortString()); - HostAndPort manager = HostAndPort.fromString(locations.get(0)); - if (manager.getPort() == 0) { ++ final String managerLocation = managers.iterator().next().toHostPortString(); ++ if (managerLocation.equals("0.0.0.0:0")) { ++ // The Manager creates the lock with an initial address of 0.0.0.0:0, then ++ // later updates the lock contents with the actual address after everything ++ // is started. ++ log.debug("Manager is up and lock acquired, waiting for address..."); + return null; + } - ++ HostAndPort manager = HostAndPort.fromString(managerLocation); try { // Manager requests can take a long time: don't ever time out return ThriftUtil.getClientNoTimeout(type, manager, context); diff --cc minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java index 53ffdf1443,b8368c9e61..57164165cf --- a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java +++ b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java @@@ -24,10 -24,11 +24,12 @@@ import java.io.IOException import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; ++import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.Optional; + import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@@ -514,12 -529,67 +516,74 @@@ public class MiniAccumuloClusterContro stop(server, hostname); } + public List<Process> getCompactors(String resourceGroup) { + return compactorProcesses.get(resourceGroup); + } + + public List<Process> getTabletServers(String resourceGroup) { + return tabletServerProcesses.get(resourceGroup); + } + + public void refreshProcesses(ServerType type) { + switch (type) { - case COMPACTION_COORDINATOR: - if (!coordinatorProcess.isAlive()) { - coordinatorProcess = null; - } - break; + case COMPACTOR: - compactorProcesses.removeIf(process -> !process.isAlive()); ++ compactorProcesses.forEach((k, v) -> v.removeIf(process -> !process.isAlive())); + break; + case GARBAGE_COLLECTOR: + if (!gcProcess.isAlive()) { + gcProcess = null; + } + break; + case MANAGER: + if (!managerProcess.isAlive()) { + managerProcess = null; + } + break; + case MONITOR: + if (!monitor.isAlive()) { + monitor = null; + } + break; + case SCAN_SERVER: - scanServerProcesses.removeIf(process -> !process.isAlive()); ++ scanServerProcesses.forEach((k, v) -> v.removeIf(process -> !process.isAlive())); + break; + case TABLET_SERVER: - tabletServerProcesses.removeIf(process -> !process.isAlive()); ++ tabletServerProcesses.forEach((k, v) -> v.removeIf(process -> !process.isAlive())); + break; + case ZOOKEEPER: + if (!zooKeeperProcess.isAlive()) { + zooKeeperProcess = null; + } + break; + default: + throw new IllegalArgumentException("Unhandled type: " + type); + } + } + + public Set<Process> getProcesses(ServerType type) { + switch (type) { - case COMPACTION_COORDINATOR: - return coordinatorProcess == null ? Set.of() : Set.of(coordinatorProcess); + case COMPACTOR: - return Set.copyOf(compactorProcesses); ++ Set<Process> cprocesses = new HashSet<>(); ++ compactorProcesses.values().forEach(list -> list.forEach(cprocesses::add)); ++ return cprocesses; + case GARBAGE_COLLECTOR: + return gcProcess == null ? Set.of() : Set.of(gcProcess); + case MANAGER: + return managerProcess == null ? Set.of() : Set.of(managerProcess); + case MONITOR: + return monitor == null ? Set.of() : Set.of(monitor); + case SCAN_SERVER: - return Set.copyOf(scanServerProcesses); ++ Set<Process> sprocesses = new HashSet<>(); ++ scanServerProcesses.values().forEach(list -> list.forEach(sprocesses::add)); ++ return sprocesses; + case TABLET_SERVER: - return Set.copyOf(tabletServerProcesses); ++ Set<Process> tprocesses = new HashSet<>(); ++ tabletServerProcesses.values().forEach(list -> list.forEach(tprocesses::add)); ++ return tprocesses; + case ZOOKEEPER: + return zooKeeperProcess == null ? Set.of() : Set.of(zooKeeperProcess); + default: + throw new IllegalArgumentException("Unhandled type: " + type); + } + } } diff --cc server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java index 2eb814ac23,115cbb424f..268d8f5dbb --- a/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java +++ b/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java @@@ -20,21 -20,21 +20,25 @@@ package org.apache.accumulo.server import static java.util.concurrent.TimeUnit.MILLISECONDS; +import java.util.List; import java.util.OptionalInt; import java.util.concurrent.ScheduledFuture; + import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.classloader.ClassLoaderUtil; import org.apache.accumulo.core.cli.ConfigOpts; + import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.conf.SiteConfiguration; +import org.apache.accumulo.core.conf.cluster.ClusterConfigParser; import org.apache.accumulo.core.lock.ServiceLock; import org.apache.accumulo.core.metrics.MetricsProducer; + import org.apache.accumulo.core.process.thrift.ServerProcessService; + import org.apache.accumulo.core.securityImpl.thrift.TCredentials; import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.core.util.Halt; import org.apache.accumulo.core.util.Timer; @@@ -62,9 -62,10 +67,11 @@@ public abstract class AbstractServe private volatile Timer idlePeriodTimer = null; private volatile Thread serverThread; private volatile Thread verificationThread; + private final AtomicBoolean shutdownRequested = new AtomicBoolean(false); + private final AtomicBoolean shutdownComplete = new AtomicBoolean(false); - protected AbstractServer(String appName, ConfigOpts opts, String[] args) { + protected AbstractServer(String appName, ConfigOpts opts, + Function<SiteConfiguration,ServerContext> serverContextFactory, String[] args) { this.applicationName = appName; opts.parseArgs(appName, args); var siteConfig = opts.getSiteConfiguration(); @@@ -120,16 -119,62 +127,70 @@@ } } + protected String getResourceGroupPropertyValue(SiteConfiguration conf) { + return Constants.DEFAULT_RESOURCE_GROUP_NAME; + } + + public String getResourceGroup() { + return resourceGroup; + } + + @Override + public void gracefulShutdown(TCredentials credentials) { + + try { + if (!context.getSecurityOperation().canPerformSystemActions(credentials)) { + log.warn("Ignoring shutdown request, user " + credentials.getPrincipal() + + " does not have the appropriate permissions."); + } + } catch (ThriftSecurityException e) { + log.error( + "Error trying to determine if user has permissions to shutdown server, ignoring request", + e); + return; + } + + if (shutdownRequested.compareAndSet(false, true)) { + // Don't interrupt the server thread, that will cause + // IO operations to fail as the servers are finishing + // their work. + log.info("Graceful shutdown initiated."); + } else { + log.warn("Graceful shutdown previously requested."); + } + } + + public boolean isShutdownRequested() { + return shutdownRequested.get(); + } + + public AtomicBoolean getShutdownComplete() { + return shutdownComplete; + } + /** - * Run this server in a main thread + * Run this server in a main thread. The server's run method should set up the server, then wait + * on isShutdownRequested() to return false, like so: + * + * <pre> + * public void run() { + * // setup server and start threads + * while (!isShutdownRequested()) { + * if (Thread.currentThread().isInterrupted()) { + * LOG.info("Server process thread has been interrupted, shutting down"); + * break; + * } + * try { + * // sleep or other things + * } catch (InterruptedException e) { + * gracefulShutdown(); + * } + * } + * // shut down server + * getShutdownComplete().set(true); + * ServiceLock.unlock(serverLock); + * } + * </pre> */ public void runServer() throws Exception { final AtomicReference<Throwable> err = new AtomicReference<>(); diff --cc server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java index f20aa6dfb5,9b7057b689..ab593585b0 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java @@@ -27,8 -26,8 +27,9 @@@ import java.util.HashMap import java.util.HashSet; import java.util.Map; import java.util.Map.Entry; +import java.util.Optional; import java.util.Set; + import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; @@@ -200,14 -230,16 +201,17 @@@ public class LiveTServerSet implements } } - // The set of active tservers with locks, indexed by their name in zookeeper + // The set of active tservers with locks, indexed by their name in zookeeper. When the contents of + // this map are modified, tServersSnapshot should be set to null. private final Map<String,TServerInfo> current = new HashMap<>(); - // as above, indexed by TServerInstance - private final Map<TServerInstance,TServerInfo> currentInstances = new HashMap<>(); + + private LiveTServersSnapshot tServersSnapshot = null; + private final ConcurrentHashMap<String,TServerInfo> serversShuttingDown = + new ConcurrentHashMap<>(); + // The set of entries in zookeeper without locks, and the first time each was noticed - private final Map<String,Long> locklessServers = new HashMap<>(); + private final Map<ServiceLockPath,Long> locklessServers = new HashMap<>(); private final Supplier<ZooCache> zcSupplier; @@@ -259,34 -307,33 +276,35 @@@ } private synchronized void checkServer(final Set<TServerInstance> updates, - final Set<TServerInstance> doomed, final String path, final String zPath) + final Set<TServerInstance> doomed, final ServiceLockPath tserverPath) throws InterruptedException, KeeperException { - TServerInfo info = current.get(zPath); + // invalidate the snapshot forcing it to be recomputed the next time its requested + tServersSnapshot = null; + + final TServerInfo info = current.get(tserverPath.getServer()); - final var zLockPath = ServiceLock.path(path + "/" + zPath); ZcStat stat = new ZcStat(); - HostAndPort address = ServiceLock.getLockData(getZooCache(), zLockPath, stat) - .map(sld -> sld.getAddress(ServiceLockData.ThriftService.TSERV)).orElse(null); + Optional<ServiceLockData> sld = ServiceLock.getLockData(getZooCache(), tserverPath, stat); - if (address == null) { + if (sld.isEmpty()) { if (info != null) { doomed.add(info.instance); - current.remove(zPath); - currentInstances.remove(info.instance); - serversShuttingDown.remove(zPath); + current.remove(tserverPath.getServer()); ++ serversShuttingDown.remove(tserverPath.toString()); } - Long firstSeen = locklessServers.get(zPath); + Long firstSeen = locklessServers.get(tserverPath); if (firstSeen == null) { - locklessServers.put(zPath, System.currentTimeMillis()); + locklessServers.put(tserverPath, System.currentTimeMillis()); } else if (System.currentTimeMillis() - firstSeen > MINUTES.toMillis(10)) { - deleteServerNode(path + "/" + zPath); - locklessServers.remove(zPath); + deleteServerNode(tserverPath.toString()); + locklessServers.remove(tserverPath); } } else { - locklessServers.remove(zPath); + locklessServers.remove(tserverPath); + HostAndPort address = sld.orElseThrow().getAddress(ServiceLockData.ThriftService.TSERV); + String resourceGroup = sld.orElseThrow().getGroup(ServiceLockData.ThriftService.TSERV); TServerInstance instance = new TServerInstance(address, stat.getEphemeralOwner()); if (info == null) { @@@ -346,68 -397,10 +364,70 @@@ return tServerInfo.connection; } + public synchronized String getResourceGroup(TServerInstance server) { + if (server == null) { + return null; + } + TServerInfo tServerInfo = getSnapshot().tserversInfo.get(server); + if (tServerInfo == null) { + return null; + } + return tServerInfo.resourceGroup; + } + + public static class LiveTServersSnapshot { + private final Set<TServerInstance> tservers; + private final Map<String,Set<TServerInstance>> tserverGroups; + + // TServerInfo is only for internal use, so this field is private w/o a getter. + private final Map<TServerInstance,TServerInfo> tserversInfo; + + @VisibleForTesting + public LiveTServersSnapshot(Set<TServerInstance> currentServers, + Map<String,Set<TServerInstance>> serverGroups) { + this.tserversInfo = null; + this.tservers = Set.copyOf(currentServers); + Map<String,Set<TServerInstance>> copy = new HashMap<>(); + serverGroups.forEach((k, v) -> copy.put(k, Set.copyOf(v))); + this.tserverGroups = Collections.unmodifiableMap(copy); + } + + public LiveTServersSnapshot(Map<TServerInstance,TServerInfo> currentServers, + Map<String,Set<TServerInstance>> serverGroups) { + this.tserversInfo = Map.copyOf(currentServers); + this.tservers = this.tserversInfo.keySet(); + Map<String,Set<TServerInstance>> copy = new HashMap<>(); + serverGroups.forEach((k, v) -> copy.put(k, Set.copyOf(v))); + this.tserverGroups = Collections.unmodifiableMap(copy); + } + + public Set<TServerInstance> getTservers() { + return tservers; + } + + public Map<String,Set<TServerInstance>> getTserverGroups() { + return tserverGroups; + } + } + + public synchronized LiveTServersSnapshot getSnapshot() { + if (tServersSnapshot == null) { + HashMap<TServerInstance,TServerInfo> tServerInstances = new HashMap<>(); + Map<String,Set<TServerInstance>> tserversGroups = new HashMap<>(); + current.values().forEach(tServerInfo -> { + tServerInstances.put(tServerInfo.instance, tServerInfo); + tserversGroups.computeIfAbsent(tServerInfo.resourceGroup, rg -> new HashSet<>()) + .add(tServerInfo.instance); + }); + tServersSnapshot = new LiveTServersSnapshot(tServerInstances, tserversGroups); + } + return tServersSnapshot; + } + public synchronized Set<TServerInstance> getCurrentServers() { - return getSnapshot().getTservers(); - Set<TServerInstance> current = currentInstances.keySet(); ++ Set<TServerInstance> current = new HashSet<>(getSnapshot().getTservers()); + serversShuttingDown.values().forEach(tsi -> current.remove(tsi.instance)); - return new HashSet<>(current); ++ return current; } public synchronized int size() { diff --cc server/base/src/main/java/org/apache/accumulo/server/rpc/ThriftProcessorTypes.java index 6f0a6086eb,59f4e154f5..0a56b4e32f --- a/server/base/src/main/java/org/apache/accumulo/server/rpc/ThriftProcessorTypes.java +++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/ThriftProcessorTypes.java @@@ -105,23 -113,39 +113,30 @@@ public class ThriftProcessorTypes<C ext return muxProcessor; } - public static TMultiplexedProcessor getGcTProcessor(GCMonitorService.Iface serviceHandler, - public static TMultiplexedProcessor getCoordinatorTProcessor( - ServerProcessService.Iface processHandler, CompactionCoordinatorService.Iface serviceHandler, -- ServerContext context) { - TMultiplexedProcessor muxProcessor = new TMultiplexedProcessor(); - muxProcessor.registerProcessor(SERVER_PROCESS.getServiceName(), - SERVER_PROCESS.getTProcessor(ServerProcessService.Processor.class, - ServerProcessService.Iface.class, processHandler, context)); - muxProcessor.registerProcessor(COORDINATOR.getServiceName(), - COORDINATOR.getTProcessor(CompactionCoordinatorService.Processor.class, - CompactionCoordinatorService.Iface.class, serviceHandler, context)); - return muxProcessor; - } - + public static TMultiplexedProcessor getGcTProcessor(ServerProcessService.Iface processHandler, + GCMonitorService.Iface serviceHandler, ServerContext context) { TMultiplexedProcessor muxProcessor = new TMultiplexedProcessor(); + muxProcessor.registerProcessor(SERVER_PROCESS.getServiceName(), + SERVER_PROCESS.getTProcessor(ServerProcessService.Processor.class, + ServerProcessService.Iface.class, processHandler, context)); muxProcessor.registerProcessor(GC.getServiceName(), GC.getTProcessor( GCMonitorService.Processor.class, GCMonitorService.Iface.class, serviceHandler, context)); return muxProcessor; } - public static TMultiplexedProcessor getManagerTProcessor(FateService.Iface fateServiceHandler, + public static TMultiplexedProcessor getManagerTProcessor( + ServerProcessService.Iface processHandler, FateService.Iface fateServiceHandler, + CompactionCoordinatorService.Iface coordinatorServiceHandler, ManagerClientService.Iface managerServiceHandler, ServerContext context) { TMultiplexedProcessor muxProcessor = new TMultiplexedProcessor(); + muxProcessor.registerProcessor(SERVER_PROCESS.getServiceName(), + SERVER_PROCESS.getTProcessor(ServerProcessService.Processor.class, + ServerProcessService.Iface.class, processHandler, context)); muxProcessor.registerProcessor(FATE.getServiceName(), FATE.getTProcessor( FateService.Processor.class, FateService.Iface.class, fateServiceHandler, context)); + muxProcessor.registerProcessor(COORDINATOR.getServiceName(), + COORDINATOR.getTProcessor(CompactionCoordinatorService.Processor.class, + CompactionCoordinatorService.Iface.class, coordinatorServiceHandler, context)); muxProcessor.registerProcessor(MANAGER.getServiceName(), MANAGER.getTProcessor(ManagerClientService.Processor.class, ManagerClientService.Iface.class, managerServiceHandler, context)); diff --cc server/base/src/main/java/org/apache/accumulo/server/util/Admin.java index 7983b4f92a,047c2c47d9..f8e8df3c52 --- a/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java @@@ -65,24 -57,15 +65,25 @@@ import org.apache.accumulo.core.conf.Ac import org.apache.accumulo.core.conf.DefaultConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.fate.AdminUtil; -import org.apache.accumulo.core.fate.FateTxId; -import org.apache.accumulo.core.fate.ReadOnlyTStore; -import org.apache.accumulo.core.fate.ZooStore; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.FateInstanceType; +import org.apache.accumulo.core.fate.FateStore; +import org.apache.accumulo.core.fate.ReadOnlyFateStore; +import org.apache.accumulo.core.fate.user.UserFateStore; +import org.apache.accumulo.core.fate.zookeeper.MetaFateStore; import org.apache.accumulo.core.fate.zookeeper.ZooCache; +import org.apache.accumulo.core.fate.zookeeper.ZooUtil; import org.apache.accumulo.core.lock.ServiceLock; +import org.apache.accumulo.core.lock.ServiceLockData; +import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector; +import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath; import org.apache.accumulo.core.manager.thrift.FateService; +import org.apache.accumulo.core.manager.thrift.TFateId; import org.apache.accumulo.core.metadata.AccumuloTable; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; + import org.apache.accumulo.core.process.thrift.ServerProcessService; import org.apache.accumulo.core.rpc.ThriftUtil; import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; import org.apache.accumulo.core.security.Authorizations; @@@ -661,9 -618,28 +675,28 @@@ public class Admin implements KeywordEx client -> client.shutdown(TraceUtil.traceInfo(), context.rpcCreds(), tabletServersToo)); } + // Visible for tests + public static void signalGracefulShutdown(final ClientContext context, String address) { + + Objects.requireNonNull(address, "address not set"); + final HostAndPort hp = HostAndPort.fromString(address); + ServerProcessService.Client client = null; + try { + client = ThriftClientTypes.SERVER_PROCESS.getServerProcessConnection(context, log, + hp.getHost(), hp.getPort()); + client.gracefulShutdown(context.rpcCreds()); + } catch (TException e) { + throw new RuntimeException("Error invoking graceful shutdown for server: " + hp, e); + } finally { + if (client != null) { + ThriftUtil.returnClient(client, context); + } + } + } + private static void stopTabletServer(final ClientContext context, List<String> servers, final boolean force) throws AccumuloException, AccumuloSecurityException { - if (context.getManagerLocations().isEmpty()) { + if (context.instanceOperations().getServers(ServerId.Type.MANAGER).isEmpty()) { log.info("No managers running. Not attempting safe unload of tserver."); return; } diff --cc server/base/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java index fb374ce229,5617213b13..a52b63945c --- a/server/base/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java +++ b/server/base/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java @@@ -58,9 -58,10 +59,9 @@@ public class DistributedWorkQueue private static final Logger log = LoggerFactory.getLogger(DistributedWorkQueue.class); - private ThreadPoolExecutor threadPool; private final ZooReaderWriter zoo; private final String path; - private final ServerContext context; + private final AbstractServer server; private final long timerInitialDelay; private final long timerPeriod; @@@ -192,19 -190,17 +198,23 @@@ } public ServerContext getContext() { - return context; + return server.getContext(); + } + + public AbstractServer getServer() { + return server; } - public void startProcessing(final Processor processor, ThreadPoolExecutor executorService) - throws KeeperException, InterruptedException { + public long getCheckInterval() { + return this.timerPeriod; + } - threadPool = executorService; + /** + * Finds the children at the path passed in the constructor and calls {@code lookForWork} which + * will attempt to process all of the currently available work + */ + public void processExistingWork(final Processor processor, ExecutorService executor, + final int maxThreads, boolean setWatch) throws KeeperException, InterruptedException { zoo.mkdirs(path); zoo.mkdirs(path + "/" + LOCKS_NODE); diff --cc server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index e61ab5a112,eb926da7cf..a954b687bc --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@@ -274,13 -251,26 +271,13 @@@ public class Compactor extends Abstract protected void announceExistence(HostAndPort clientAddress) throws KeeperException, InterruptedException { - String hostPort = ExternalCompactionUtil.getHostPortString(clientAddress); - - ZooReaderWriter zoo = getContext().getZooSession().asReaderWriter(); - String compactorQueuePath = - getContext().getZooKeeperRoot() + Constants.ZCOMPACTORS + "/" + this.queueName; - String zPath = compactorQueuePath + "/" + hostPort; - - try { - zoo.mkdirs(compactorQueuePath); - zoo.putPersistentData(zPath, new byte[] {}, NodeExistsPolicy.SKIP); - } catch (KeeperException.NoAuthException e) { - LOG.error("Failed to write to ZooKeeper. Ensure that" - + " accumulo.properties, specifically instance.secret, is consistent."); - throw e; - } - - compactorLock = - new ServiceLock(getContext().getZooSession(), ServiceLock.path(zPath), compactorId); - LockWatcher lw = new ServiceLockWatcher("compactor", () -> getShutdownComplete().get(), - (name) -> getContext().getLowMemoryDetector().logGCInfo(getConfiguration())); + final ZooReaderWriter zoo = getContext().getZooSession().asReaderWriter(); + final ServiceLockPath path = + getContext().getServerPaths().createCompactorPath(getResourceGroup(), clientAddress); + ServiceLockSupport.createNonHaServiceLockPath(Type.COMPACTOR, zoo, path); + compactorLock = new ServiceLock(getContext().getZooSession(), path, compactorId); - LockWatcher lw = new ServiceLockWatcher(Type.COMPACTOR, () -> false, ++ LockWatcher lw = new ServiceLockWatcher(Type.COMPACTOR, () -> getShutdownComplete().get(), + (type) -> getContext().getLowMemoryDetector().logGCInfo(getConfiguration())); try { for (int i = 0; i < 25; i++) { @@@ -320,10 -305,10 +317,10 @@@ * @throws UnknownHostException host unknown */ protected ServerAddress startCompactorClientService() throws UnknownHostException { - ClientServiceHandler clientHandler = - new ClientServiceHandler(getContext(), new TransactionWatcher(getContext())); - var processor = - ThriftProcessorTypes.getCompactorTProcessor(this, clientHandler, this, getContext()); + + ClientServiceHandler clientHandler = new ClientServiceHandler(getContext()); - var processor = ThriftProcessorTypes.getCompactorTProcessor(clientHandler, ++ var processor = ThriftProcessorTypes.getCompactorTProcessor(this, clientHandler, + getCompactorThriftHandlerInterface(), getContext()); ServerAddress sp = TServerUtils.startServer(getContext(), getHostname(), Property.COMPACTOR_CLIENTPORT, processor, this.getClass().getSimpleName(), "Thrift Client Server", Property.COMPACTOR_PORTSEARCH, Property.COMPACTOR_MINTHREADS, @@@ -697,189 -682,184 +694,194 @@@ try { final AtomicReference<Throwable> err = new AtomicReference<>(); - final LogSorter logSorter = new LogSorter(getContext(), getConfiguration()); ++ final LogSorter logSorter = new LogSorter(this); + long nextSortLogsCheckTime = System.currentTimeMillis(); - while (!shutdown) { - - // mark compactor as idle while not in the compaction loop - updateIdleStatus(true); - - currentCompactionId.set(null); - err.set(null); - JOB_HOLDER.reset(); - - if (System.currentTimeMillis() > nextSortLogsCheckTime) { - // Attempt to process all existing log sorting work serially in this thread. - // When no work remains, this call will return so that we can look for compaction - // work. - LOG.debug("Checking to see if any recovery logs need sorting"); - nextSortLogsCheckTime = logSorter.sortLogsIfNeeded(); + while (!isShutdownRequested()) { + if (Thread.currentThread().isInterrupted()) { + LOG.info("Server process thread has been interrupted, shutting down"); + break; } - - TExternalCompactionJob job; try { - TNextCompactionJob next = getNextJob(getNextId()); - job = next.getJob(); - if (!job.isSetExternalCompactionId()) { - LOG.trace("No external compactions in group {}", this.getResourceGroup()); - UtilWaitThread.sleep(getWaitTimeBetweenCompactionChecks(next.getCompactorCount())); - continue; + // mark compactor as idle while not in the compaction loop + updateIdleStatus(true); + + currentCompactionId.set(null); + err.set(null); + JOB_HOLDER.reset(); + ++ if (System.currentTimeMillis() > nextSortLogsCheckTime) { ++ // Attempt to process all existing log sorting work serially in this thread. ++ // When no work remains, this call will return so that we can look for compaction ++ // work. ++ LOG.debug("Checking to see if any recovery logs need sorting"); ++ nextSortLogsCheckTime = logSorter.sortLogsIfNeeded(); + } - if (!job.getExternalCompactionId().equals(currentCompactionId.get().toString())) { - throw new IllegalStateException("Returned eci " + job.getExternalCompactionId() - + " does not match supplied eci " + currentCompactionId.get()); ++ + TExternalCompactionJob job; + try { + TNextCompactionJob next = getNextJob(getNextId()); + job = next.getJob(); + if (!job.isSetExternalCompactionId()) { - LOG.trace("No external compactions in queue {}", this.queueName); ++ LOG.trace("No external compactions in queue {}", this.getResourceGroup()); + UtilWaitThread.sleep(getWaitTimeBetweenCompactionChecks(next.getCompactorCount())); + continue; + } + if (!job.getExternalCompactionId().equals(currentCompactionId.get().toString())) { + throw new IllegalStateException("Returned eci " + job.getExternalCompactionId() + + " does not match supplied eci " + currentCompactionId.get()); + } + } catch (RetriesExceededException e2) { + LOG.warn("Retries exceeded getting next job. Retrying..."); + continue; } - } catch (RetriesExceededException e2) { - LOG.warn("Retries exceeded getting next job. Retrying..."); - continue; - } - LOG.debug("Received next compaction job: {}", job); + LOG.debug("Received next compaction job: {}", job); - final LongAdder totalInputEntries = new LongAdder(); - final LongAdder totalInputBytes = new LongAdder(); - final CountDownLatch started = new CountDownLatch(1); - final CountDownLatch stopped = new CountDownLatch(1); + final LongAdder totalInputEntries = new LongAdder(); + final LongAdder totalInputBytes = new LongAdder(); + final CountDownLatch started = new CountDownLatch(1); + final CountDownLatch stopped = new CountDownLatch(1); - final FileCompactorRunnable fcr = - createCompactionJob(job, totalInputEntries, totalInputBytes, started, stopped, err); + final FileCompactorRunnable fcr = + createCompactionJob(job, totalInputEntries, totalInputBytes, started, stopped, err); - final Thread compactionThread = - Threads.createThread("Compaction job for tablet " + job.getExtent().toString(), fcr); + final Thread compactionThread = + Threads.createThread("Compaction job for tablet " + job.getExtent().toString(), fcr); - JOB_HOLDER.set(job, compactionThread, fcr.getFileCompactor()); + JOB_HOLDER.set(job, compactionThread, fcr.getFileCompactor()); - try { - // mark compactor as busy while compacting - updateIdleStatus(false); - - // Need to call FileCompactorRunnable.initialize after calling JOB_HOLDER.set - fcr.initialize(); - - compactionThread.start(); // start the compactionThread - started.await(); // wait until the compactor is started - final long inputEntries = totalInputEntries.sum(); - final long waitTime = calculateProgressCheckTime(totalInputBytes.sum()); - LOG.debug("Progress checks will occur every {} seconds", waitTime); - String percentComplete = "unknown"; - - while (!stopped.await(waitTime, TimeUnit.SECONDS)) { - List<CompactionInfo> running = - org.apache.accumulo.server.compaction.FileCompactor.getRunningCompactions(); - if (!running.isEmpty()) { - // Compaction has started. There should only be one in the list - CompactionInfo info = running.get(0); - if (info != null) { - final long entriesRead = info.getEntriesRead(); - final long entriesWritten = info.getEntriesWritten(); - if (inputEntries > 0) { - percentComplete = Float.toString((entriesRead / (float) inputEntries) * 100); - } - String message = String.format( - "Compaction in progress, read %d of %d input entries ( %s %s ), written %d entries, paused %d times", - entriesRead, inputEntries, percentComplete, "%", entriesWritten, - info.getTimesPaused()); - watcher.run(); - try { - LOG.debug("Updating coordinator with compaction progress: {}.", message); - TCompactionStatusUpdate update = new TCompactionStatusUpdate( - TCompactionState.IN_PROGRESS, message, inputEntries, entriesRead, - entriesWritten, fcr.getCompactionAge().toNanos()); - updateCompactionState(job, update); - } catch (RetriesExceededException e) { - LOG.warn("Error updating coordinator with compaction progress, error: {}", - e.getMessage()); + try { + // mark compactor as busy while compacting + updateIdleStatus(false); + + // Need to call FileCompactorRunnable.initialize after calling JOB_HOLDER.set + fcr.initialize(); + + compactionThread.start(); // start the compactionThread + started.await(); // wait until the compactor is started + final long inputEntries = totalInputEntries.sum(); + final long waitTime = calculateProgressCheckTime(totalInputBytes.sum()); + LOG.debug("Progress checks will occur every {} seconds", waitTime); + String percentComplete = "unknown"; + + while (!stopped.await(waitTime, TimeUnit.SECONDS)) { + List<CompactionInfo> running = + org.apache.accumulo.server.compaction.FileCompactor.getRunningCompactions(); + if (!running.isEmpty()) { + // Compaction has started. There should only be one in the list + CompactionInfo info = running.get(0); + if (info != null) { + final long entriesRead = info.getEntriesRead(); + final long entriesWritten = info.getEntriesWritten(); + if (inputEntries > 0) { + percentComplete = Float.toString((entriesRead / (float) inputEntries) * 100); + } + String message = String.format( + "Compaction in progress, read %d of %d input entries ( %s %s ), written %d entries", + entriesRead, inputEntries, percentComplete, "%", entriesWritten); + watcher.run(); + try { + LOG.debug("Updating coordinator with compaction progress: {}.", message); + TCompactionStatusUpdate update = new TCompactionStatusUpdate( + TCompactionState.IN_PROGRESS, message, inputEntries, entriesRead, + entriesWritten, fcr.getCompactionAge().toNanos()); + updateCompactionState(job, update); + } catch (RetriesExceededException e) { + LOG.warn("Error updating coordinator with compaction progress, error: {}", + e.getMessage()); + } } + } else { + LOG.debug("Waiting on compaction thread to finish, but no RUNNING compaction"); } - } else { - LOG.debug("Waiting on compaction thread to finish, but no RUNNING compaction"); } - } - compactionThread.join(); - LOG.trace("Compaction thread finished."); - // Run the watcher again to clear out the finished compaction and set the - // stuck count to zero. - watcher.run(); - - if (err.get() != null) { - // maybe the error occured because the table was deleted or something like that, so - // force a cancel check to possibly reduce noise in the logs - checkIfCanceled(); - } - - if (compactionThread.isInterrupted() || JOB_HOLDER.isCancelled() - || (err.get() != null && err.get().getClass().equals(InterruptedException.class))) { - LOG.warn("Compaction thread was interrupted, sending CANCELLED state"); - try { - TCompactionStatusUpdate update = - new TCompactionStatusUpdate(TCompactionState.CANCELLED, "Compaction cancelled", - -1, -1, -1, fcr.getCompactionAge().toNanos()); - updateCompactionState(job, update); - updateCompactionFailed(job); - } catch (RetriesExceededException e) { - LOG.error("Error updating coordinator with compaction cancellation.", e); - } finally { - currentCompactionId.set(null); + compactionThread.join(); + LOG.trace("Compaction thread finished."); + // Run the watcher again to clear out the finished compaction and set the + // stuck count to zero. + watcher.run(); + + if (err.get() != null) { + // maybe the error occured because the table was deleted or something like that, so + // force a cancel check to possibly reduce noise in the logs + checkIfCanceled(); } - } else if (err.get() != null) { - KeyExtent fromThriftExtent = KeyExtent.fromThrift(job.getExtent()); - try { - LOG.info("Updating coordinator with compaction failure: id: {}, extent: {}", - job.getExternalCompactionId(), fromThriftExtent); - TCompactionStatusUpdate update = new TCompactionStatusUpdate(TCompactionState.FAILED, - "Compaction failed due to: " + err.get().getMessage(), -1, -1, -1, - fcr.getCompactionAge().toNanos()); - updateCompactionState(job, update); - updateCompactionFailed(job); - } catch (RetriesExceededException e) { - LOG.error("Error updating coordinator with compaction failure: id: {}, extent: {}", - job.getExternalCompactionId(), fromThriftExtent, e); - } finally { - currentCompactionId.set(null); - } - } else { - try { - LOG.trace("Updating coordinator with compaction completion."); - updateCompactionCompleted(job, JOB_HOLDER.getStats()); - } catch (RetriesExceededException e) { - LOG.error( - "Error updating coordinator with compaction completion, cancelling compaction.", - e); + + if (compactionThread.isInterrupted() || JOB_HOLDER.isCancelled() + || (err.get() != null && err.get().getClass().equals(InterruptedException.class))) { + LOG.warn("Compaction thread was interrupted, sending CANCELLED state"); + try { + TCompactionStatusUpdate update = + new TCompactionStatusUpdate(TCompactionState.CANCELLED, "Compaction cancelled", + -1, -1, -1, fcr.getCompactionAge().toNanos()); + updateCompactionState(job, update); + updateCompactionFailed(job); + } catch (RetriesExceededException e) { + LOG.error("Error updating coordinator with compaction cancellation.", e); + } finally { + currentCompactionId.set(null); + } + } else if (err.get() != null) { + KeyExtent fromThriftExtent = KeyExtent.fromThrift(job.getExtent()); + try { + LOG.info("Updating coordinator with compaction failure: id: {}, extent: {}", + job.getExternalCompactionId(), fromThriftExtent); + TCompactionStatusUpdate update = new TCompactionStatusUpdate( + TCompactionState.FAILED, "Compaction failed due to: " + err.get().getMessage(), + -1, -1, -1, fcr.getCompactionAge().toNanos()); + updateCompactionState(job, update); + updateCompactionFailed(job); + } catch (RetriesExceededException e) { + LOG.error("Error updating coordinator with compaction failure: id: {}, extent: {}", + job.getExternalCompactionId(), fromThriftExtent, e); + } finally { + currentCompactionId.set(null); + } + } else { try { - cancel(job.getExternalCompactionId()); - } catch (TException e1) { - LOG.error("Error cancelling compaction.", e1); + LOG.trace("Updating coordinator with compaction completion."); + updateCompactionCompleted(job, JOB_HOLDER.getStats()); + } catch (RetriesExceededException e) { + LOG.error( + "Error updating coordinator with compaction completion, cancelling compaction.", + e); + try { + cancel(job.getExternalCompactionId()); + } catch (TException e1) { + LOG.error("Error cancelling compaction.", e1); + } + } finally { + currentCompactionId.set(null); } - } finally { - currentCompactionId.set(null); } - } - } catch (RuntimeException e1) { - LOG.error( - "Compactor thread was interrupted waiting for compaction to start, cancelling job", - e1); - try { - cancel(job.getExternalCompactionId()); - } catch (TException e2) { - LOG.error("Error cancelling compaction.", e2); - } - } finally { - currentCompactionId.set(null); + } catch (RuntimeException e1) { + LOG.error( + "Compactor thread was interrupted waiting for compaction to start, cancelling job", + e1); + try { + cancel(job.getExternalCompactionId()); + } catch (TException e2) { + LOG.error("Error cancelling compaction.", e2); + } + } finally { + currentCompactionId.set(null); - // mark compactor as idle after compaction completes - updateIdleStatus(true); + // mark compactor as idle after compaction completes + updateIdleStatus(true); - // In the case where there is an error in the foreground code the background compaction - // may still be running. Must cancel it before starting another iteration of the loop to - // avoid multiple threads updating shared state. - while (compactionThread.isAlive()) { - compactionThread.interrupt(); - compactionThread.join(1000); + // In the case where there is an error in the foreground code the background compaction + // may still be running. Must cancel it before starting another iteration of the loop to + // avoid multiple threads updating shared state. + while (compactionThread.isAlive()) { + compactionThread.interrupt(); + compactionThread.join(1000); + } } + } catch (InterruptedException e) { + LOG.info("Interrupt Exception received, shutting down"); + gracefulShutdown(getContext().rpcCreds()); } - - } - + } // end while } catch (Exception e) { LOG.error("Unhandled error occurred in Compactor", e); } finally { diff --cc server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java index 62449eabb6,a9446397ed..4004326622 --- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java @@@ -195,143 -181,140 +195,159 @@@ public class SimpleGarbageCollector ext } }); - while (true) { - Span outerSpan = TraceUtil.startSpan(this.getClass(), "gc"); - try (Scope outerScope = outerSpan.makeCurrent()) { - Span innerSpan = TraceUtil.startSpan(this.getClass(), "loop"); - try (Scope innerScope = innerSpan.makeCurrent()) { - final long tStart = System.nanoTime(); - try { - System.gc(); // make room + while (!isShutdownRequested()) { + if (Thread.currentThread().isInterrupted()) { - LOG.info("Server process thread has been interrupted, shutting down"); ++ log.info("Server process thread has been interrupted, shutting down"); + break; + } + try { + Span outerSpan = TraceUtil.startSpan(this.getClass(), "gc"); + try (Scope outerScope = outerSpan.makeCurrent()) { + Span innerSpan = TraceUtil.startSpan(this.getClass(), "loop"); + try (Scope innerScope = innerSpan.makeCurrent()) { + final long tStart = System.nanoTime(); + try { + System.gc(); // make room + + status.current.started = System.currentTimeMillis(); + var rootGC = new GCRun(DataLevel.ROOT, getContext()); + var mdGC = new GCRun(DataLevel.METADATA, getContext()); + var userGC = new GCRun(DataLevel.USER, getContext()); + + log.info("Starting Root table Garbage Collection."); + status.current.bulks += new GarbageCollectionAlgorithm().collect(rootGC); + incrementStatsForRun(rootGC); + logStats(); + + log.info("Starting Metadata table Garbage Collection."); + status.current.bulks += new GarbageCollectionAlgorithm().collect(mdGC); + incrementStatsForRun(mdGC); + logStats(); + + log.info("Starting User table Garbage Collection."); + status.current.bulks += new GarbageCollectionAlgorithm().collect(userGC); + incrementStatsForRun(userGC); + logStats(); + + } catch (Exception e) { + TraceUtil.setException(innerSpan, e, false); + log.error("{}", e.getMessage(), e); + } finally { + status.current.finished = System.currentTimeMillis(); + status.last = status.current; + gcCycleMetrics.setLastCollect(status.current); + status.current = new GcCycleStats(); + } - status.current.started = System.currentTimeMillis(); - var rootGC = new GCRun(DataLevel.ROOT, getContext()); - var mdGC = new GCRun(DataLevel.METADATA, getContext()); - var userGC = new GCRun(DataLevel.USER, getContext()); + final long tStop = System.nanoTime(); + log.info(String.format("Collect cycle took %.2f seconds", + (TimeUnit.NANOSECONDS.toMillis(tStop - tStart) / 1000.0))); + + // Clean up any unused write-ahead logs + Span walSpan = TraceUtil.startSpan(this.getClass(), "walogs"); + try (Scope walScope = walSpan.makeCurrent()) { + GarbageCollectWriteAheadLogs walogCollector = + new GarbageCollectWriteAheadLogs(getContext(), fs, liveTServerSet); + log.info("Beginning garbage collection of write-ahead logs"); + walogCollector.collect(status); + gcCycleMetrics.setLastWalCollect(status.lastLog); + } catch (Exception e) { + TraceUtil.setException(walSpan, e, false); + log.error("{}", e.getMessage(), e); + } finally { + walSpan.end(); + } + } catch (Exception e) { + TraceUtil.setException(innerSpan, e, true); + throw e; + } finally { + innerSpan.end(); + } - log.info("Starting Root table Garbage Collection."); - status.current.bulks += new GarbageCollectionAlgorithm().collect(rootGC); - incrementStatsForRun(rootGC); - logStats(); + // we just made a lot of metadata changes: flush them out + try { + AccumuloClient accumuloClient = getContext(); + + final long actionStart = System.nanoTime(); + + String action = getConfiguration().get(Property.GC_USE_FULL_COMPACTION); + log.debug("gc post action {} started", action); + + switch (action) { + case "compact": + accumuloClient.tableOperations().compact(AccumuloTable.METADATA.name(), null, null, + true, true); + accumuloClient.tableOperations().compact(AccumuloTable.ROOT.name(), null, null, + true, true); + break; + case "flush": + accumuloClient.tableOperations().flush(AccumuloTable.METADATA.name(), null, null, + true); + accumuloClient.tableOperations().flush(AccumuloTable.ROOT.name(), null, null, true); + break; + default: + log.trace("'none - no action' or invalid value provided: {}", action); + } - log.info("Starting Metadata table Garbage Collection."); - status.current.bulks += new GarbageCollectionAlgorithm().collect(mdGC); - incrementStatsForRun(mdGC); - logStats(); + final long actionComplete = System.nanoTime(); - log.info("Starting User table Garbage Collection."); - status.current.bulks += new GarbageCollectionAlgorithm().collect(userGC); - incrementStatsForRun(userGC); - logStats(); + gcCycleMetrics.setPostOpDurationNanos(actionComplete - actionStart); - } catch (Exception e) { - TraceUtil.setException(innerSpan, e, false); - log.error("{}", e.getMessage(), e); - } finally { - status.current.finished = System.currentTimeMillis(); - status.last = status.current; - gcCycleMetrics.setLastCollect(status.current); - status.current = new GcCycleStats(); - } + log.info("gc post action {} completed in {} seconds", action, String.format("%.2f", + (TimeUnit.NANOSECONDS.toMillis(actionComplete - actionStart) / 1000.0))); - final long tStop = System.nanoTime(); - log.info(String.format("Collect cycle took %.2f seconds", - (TimeUnit.NANOSECONDS.toMillis(tStop - tStart) / 1000.0))); - - // Clean up any unused write-ahead logs - Span walSpan = TraceUtil.startSpan(this.getClass(), "walogs"); - try (Scope walScope = walSpan.makeCurrent()) { - GarbageCollectWriteAheadLogs walogCollector = - new GarbageCollectWriteAheadLogs(getContext(), fs, liveTServerSet); - log.info("Beginning garbage collection of write-ahead logs"); - walogCollector.collect(status); - gcCycleMetrics.setLastWalCollect(status.lastLog); } catch (Exception e) { - TraceUtil.setException(walSpan, e, false); - log.error("{}", e.getMessage(), e); - } finally { - walSpan.end(); + TraceUtil.setException(outerSpan, e, false); + log.warn("{}", e.getMessage(), e); } } catch (Exception e) { - TraceUtil.setException(innerSpan, e, true); + TraceUtil.setException(outerSpan, e, true); throw e; } finally { - innerSpan.end(); + outerSpan.end(); } - - // we just made a lot of metadata changes: flush them out try { - AccumuloClient accumuloClient = getContext(); - - final long actionStart = System.nanoTime(); - - String action = getConfiguration().get(Property.GC_USE_FULL_COMPACTION); - log.debug("gc post action {} started", action); - - switch (action) { - case "compact": - accumuloClient.tableOperations().compact(AccumuloTable.METADATA.tableName(), null, - null, true, true); - accumuloClient.tableOperations().compact(AccumuloTable.ROOT.tableName(), null, null, - true, true); - break; - case "flush": - accumuloClient.tableOperations().flush(AccumuloTable.METADATA.tableName(), null, null, - true); - accumuloClient.tableOperations().flush(AccumuloTable.ROOT.tableName(), null, null, - true); - break; - default: - log.trace("'none - no action' or invalid value provided: {}", action); - } - final long actionComplete = System.nanoTime(); - - gcCycleMetrics.setPostOpDurationNanos(actionComplete - actionStart); - - log.info("gc post action {} completed in {} seconds", action, String.format("%.2f", - (TimeUnit.NANOSECONDS.toMillis(actionComplete - actionStart) / 1000.0))); - - } catch (Exception e) { - TraceUtil.setException(outerSpan, e, false); - log.warn("{}", e.getMessage(), e); - } - } catch (Exception e) { - TraceUtil.setException(outerSpan, e, true); - throw e; - } finally { - outerSpan.end(); - } - try { - - gcCycleMetrics.incrementRunCycleCount(); - long gcDelay = getConfiguration().getTimeInMillis(Property.GC_CYCLE_DELAY); - - if (lastCompactorCheck.hasElapsed(gcDelay * 3, MILLISECONDS)) { - Map<String,Set<TableId>> resourceMapping = new HashMap<>(); - for (TableId tid : AccumuloTable.allTableIds()) { - TableConfiguration tconf = getContext().getTableConfiguration(tid); - String resourceGroup = tconf.get(TableLoadBalancer.TABLE_ASSIGNMENT_GROUP_PROPERTY); - resourceGroup = - resourceGroup == null ? Constants.DEFAULT_RESOURCE_GROUP_NAME : resourceGroup; - resourceMapping.getOrDefault(resourceGroup, new HashSet<>()).add(tid); - } - for (Entry<String,Set<TableId>> e : resourceMapping.entrySet()) { - if (ExternalCompactionUtil.countCompactors(e.getKey(), getContext()) == 0) { - log.warn("No Compactors exist in resource group {} for system table {}", e.getKey(), - e.getValue()); + gcCycleMetrics.incrementRunCycleCount(); + long gcDelay = getConfiguration().getTimeInMillis(Property.GC_CYCLE_DELAY); ++ ++ if (lastCompactorCheck.hasElapsed(gcDelay * 3, MILLISECONDS)) { ++ Map<String,Set<TableId>> resourceMapping = new HashMap<>(); ++ for (TableId tid : AccumuloTable.allTableIds()) { ++ TableConfiguration tconf = getContext().getTableConfiguration(tid); ++ String resourceGroup = tconf.get(TableLoadBalancer.TABLE_ASSIGNMENT_GROUP_PROPERTY); ++ resourceGroup = ++ resourceGroup == null ? Constants.DEFAULT_RESOURCE_GROUP_NAME : resourceGroup; ++ resourceMapping.getOrDefault(resourceGroup, new HashSet<>()).add(tid); ++ } ++ for (Entry<String,Set<TableId>> e : resourceMapping.entrySet()) { ++ if (ExternalCompactionUtil.countCompactors(e.getKey(), getContext()) == 0) { ++ log.warn("No Compactors exist in resource group {} for system table {}", e.getKey(), ++ e.getValue()); ++ } + } ++ lastCompactorCheck.restart(); + } - lastCompactorCheck.restart(); - } + - log.debug("Sleeping for {} milliseconds", gcDelay); - Thread.sleep(gcDelay); + log.debug("Sleeping for {} milliseconds", gcDelay); + Thread.sleep(gcDelay); + } catch (InterruptedException e) { + log.warn("{}", e.getMessage(), e); + throw e; + } } catch (InterruptedException e) { - log.warn("{}", e.getMessage(), e); - return; + log.info("Interrupt Exception received, shutting down"); + gracefulShutdown(getContext().rpcCreds()); } } + getShutdownComplete().set(true); + log.info("stop requested. exiting ... "); + try { + gcLock.unlock(); + } catch (Exception e) { + log.warn("Failed to release GarbageCollector lock", e); + } + } private void incrementStatsForRun(GCRun gcRun) { @@@ -371,11 -354,12 +387,12 @@@ UUID zooLockUUID = UUID.randomUUID(); gcLock = new ServiceLock(getContext().getZooSession(), path, zooLockUUID); - HAServiceLockWatcher gcLockWatcher = new HAServiceLockWatcher(Type.GARBAGE_COLLECTOR); + HAServiceLockWatcher gcLockWatcher = - new HAServiceLockWatcher("gc", () -> getShutdownComplete().get()); ++ new HAServiceLockWatcher(Type.GARBAGE_COLLECTOR, () -> getShutdownComplete().get()); while (true) { - gcLock.lock(gcLockWatcher, - new ServiceLockData(zooLockUUID, addr.toString(), ThriftService.GC)); + gcLock.lock(gcLockWatcher, new ServiceLockData(zooLockUUID, addr.toString(), ThriftService.GC, + this.getResourceGroup())); gcLockWatcher.waitForChange(); diff --cc server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index eb22227bb9,1fa3de8c88..48a049fcf9 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@@ -1136,8 -1243,8 +1136,8 @@@ public class Manager extends AbstractSe HighlyAvailableServiceWrapper.service(managerClientHandler, this); ServerAddress sa; - var processor = ThriftProcessorTypes.getManagerTProcessor(fateServiceHandler, - var processor = - ThriftProcessorTypes.getManagerTProcessor(this, fateServiceHandler, haProxy, getContext()); ++ var processor = ThriftProcessorTypes.getManagerTProcessor(this, fateServiceHandler, + compactionCoordinator.getThriftService(), haProxy, getContext()); try { sa = TServerUtils.startServer(context, getHostname(), Property.MANAGER_CLIENTPORT, processor, @@@ -1335,14 -1423,25 +1335,27 @@@ // The manager is fully initialized. Clients are allowed to connect now. managerInitialized.set(true); - while (clientService.isServing()) { - sleepUninterruptibly(500, MILLISECONDS); + while (!isShutdownRequested() && clientService.isServing()) { + if (Thread.currentThread().isInterrupted()) { - LOG.info("Server process thread has been interrupted, shutting down"); ++ log.info("Server process thread has been interrupted, shutting down"); + break; + } + try { + Thread.sleep(500); + } catch (InterruptedException e) { + log.info("Interrupt Exception received, shutting down"); + gracefulShutdown(context.rpcCreds()); + } } - log.info("Shutting down fate."); + - LOG.debug("Stopping Thrift Servers"); - sa.server.stop(); - + log.debug("Shutting down fate."); - fate().shutdown(); + getFateRefs().keySet().forEach(type -> fate(type).shutdown(0, MINUTES)); + + splitter.stop(); + ++ log.debug("Stopping Thrift Servers"); ++ sa.server.stop(); + final long deadline = System.currentTimeMillis() + MAX_CLEANUP_WAIT_TIME; try { statusThread.join(remaining(deadline)); @@@ -1376,19 -1472,18 +1389,25 @@@ throw new IllegalStateException("Exception waiting on watcher", e); } } - log.info("exiting"); + getShutdownComplete().set(true); + log.info("stop requested. exiting ... "); + try { + managerLock.unlock(); + } catch (Exception e) { + log.warn("Failed to release Manager lock", e); + } } - protected Fate<Manager> initializeFateInstance(TStore<Manager> store, - AccumuloConfiguration conf) { - return new Fate<>(this, store, TraceRepo::toLogString, conf); + protected Fate<Manager> initializeFateInstance(ServerContext context, FateStore<Manager> store) { + + final Fate<Manager> fateInstance = + new Fate<>(this, store, true, TraceRepo::toLogString, getConfiguration()); + + var fateCleaner = new FateCleaner<>(store, Duration.ofHours(8), this::getSteadyTime); + ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor() + .scheduleWithFixedDelay(fateCleaner::ageOff, 10, 4 * 60, MINUTES)); + + return fateInstance; } /** @@@ -1496,13 -1591,12 +1515,14 @@@ getHostname() + ":" + getConfiguration().getPort(Property.MANAGER_CLIENTPORT)[0]; UUID zooLockUUID = UUID.randomUUID(); - ServiceLockData sld = - new ServiceLockData(zooLockUUID, managerClientAddress, ThriftService.MANAGER); + ServiceDescriptors descriptors = new ServiceDescriptors(); + descriptors.addService(new ServiceDescriptor(zooLockUUID, ThriftService.MANAGER, + managerClientAddress, this.getResourceGroup())); + ServiceLockData sld = new ServiceLockData(descriptors); managerLock = new ServiceLock(zooKeeper, zManagerLoc, zooLockUUID); - HAServiceLockWatcher managerLockWatcher = new HAServiceLockWatcher(Type.MANAGER); + HAServiceLockWatcher managerLockWatcher = - new HAServiceLockWatcher("manager", () -> getShutdownComplete().get()); ++ new HAServiceLockWatcher(Type.MANAGER, () -> getShutdownComplete().get()); while (true) { diff --cc server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java index 2bf22a2841,1222ceba87..94ddc573f1 --- a/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java @@@ -331,6 -327,40 +332,17 @@@ public class ManagerClientServiceHandle log.debug("FATE op shutting down " + tabletServer + " finished"); } + @Override + public void tabletServerStopping(TInfo tinfo, TCredentials credentials, String tabletServer) + throws ThriftSecurityException, ThriftNotActiveServiceException, TException { + if (!manager.security.canPerformSystemActions(credentials)) { + throw new ThriftSecurityException(credentials.getPrincipal(), + SecurityErrorCode.PERMISSION_DENIED); + } + log.info("Tablet Server {} has reported it's shutting down", tabletServer); + manager.tserverSet.tabletServerShuttingDown(tabletServer); + } + - @Override - public void reportSplitExtent(TInfo info, TCredentials credentials, String serverName, - TabletSplit split) throws ThriftSecurityException { - if (!manager.security.canPerformSystemActions(credentials)) { - throw new ThriftSecurityException(credentials.getPrincipal(), - SecurityErrorCode.PERMISSION_DENIED); - } - - KeyExtent oldTablet = KeyExtent.fromThrift(split.oldTablet); - if (manager.migrations.remove(oldTablet) != null) { - Manager.log.info("Canceled migration of {}", split.oldTablet); - } - for (TServerInstance instance : manager.tserverSet.getCurrentServers()) { - if (serverName.equals(instance.getHostPort())) { - manager.nextEvent.event("%s reported split %s, %s", serverName, - KeyExtent.fromThrift(split.newTablets.get(0)), - KeyExtent.fromThrift(split.newTablets.get(1))); - return; - } - } - Manager.log.warn("Got a split from a server we don't recognize: {}", serverName); - } - @Override public void reportTabletStatus(TInfo info, TCredentials credentials, String serverName, TabletLoadState status, TKeyExtent ttablet) throws ThriftSecurityException { diff --cc server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java index ac142dc671,b2e60a04bc..4bcb06a552 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java @@@ -423,6 -422,22 +423,22 @@@ public class Monitor extends AbstractSe }).start(); monitorInitialized.set(true); + + while (!isShutdownRequested()) { + if (Thread.currentThread().isInterrupted()) { - LOG.info("Server process thread has been interrupted, shutting down"); ++ log.info("Server process thread has been interrupted, shutting down"); + break; + } + try { + Thread.sleep(1000); + } catch (InterruptedException e) { - LOG.info("Interrupt Exception received, shutting down"); ++ log.info("Interrupt Exception received, shutting down"); + gracefulShutdown(context.rpcCreds()); + } + } + + server.stop(); + log.info("stop requested. exiting ... "); } private ServletHolder getDefaultServlet() { @@@ -718,13 -737,12 +734,14 @@@ // Get a ZooLock for the monitor UUID zooLockUUID = UUID.randomUUID(); monitorLock = new ServiceLock(context.getZooSession(), monitorLockPath, zooLockUUID); - HAServiceLockWatcher monitorLockWatcher = new HAServiceLockWatcher(Type.MONITOR); + HAServiceLockWatcher monitorLockWatcher = - new HAServiceLockWatcher("monitor", () -> isShutdownRequested()); ++ new HAServiceLockWatcher(Type.MONITOR, () -> isShutdownRequested()); while (true) { - monitorLock.lock(monitorLockWatcher, new ServiceLockData(zooLockUUID, - monitorLocation.getHost() + ":" + monitorLocation.getPort(), ThriftService.NONE)); + monitorLock.lock(monitorLockWatcher, + new ServiceLockData(zooLockUUID, + monitorLocation.getHost() + ":" + monitorLocation.getPort(), ThriftService.NONE, + this.getResourceGroup())); monitorLockWatcher.waitForChange(); diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java index 8a409d566b,2ebaab6fe3..1a00514f7f --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java @@@ -303,9 -301,10 +302,9 @@@ public class ScanServer extends Abstrac // This class implements TabletClientService.Iface and then delegates calls. Be sure // to set up the ThriftProcessor using this class, not the delegate. - ClientServiceHandler clientHandler = - new ClientServiceHandler(context, new TransactionWatcher(context)); + ClientServiceHandler clientHandler = new ClientServiceHandler(context); TProcessor processor = - ThriftProcessorTypes.getScanServerTProcessor(clientHandler, this, getContext()); + ThriftProcessorTypes.getScanServerTProcessor(this, clientHandler, this, getContext()); ServerAddress sp = TServerUtils.startServer(getContext(), getHostname(), Property.SSERV_CLIENTPORT, processor, this.getClass().getSimpleName(), @@@ -327,16 -326,27 +326,16 @@@ * Set up nodes and locks in ZooKeeper for this Compactor */ private ServiceLock announceExistence() { - ZooReaderWriter zoo = getContext().getZooSession().asReaderWriter(); + final ZooReaderWriter zoo = getContext().getZooSession().asReaderWriter(); try { - var zLockPath = ServiceLock.path( - getContext().getZooKeeperRoot() + Constants.ZSSERVERS + "/" + getClientAddressString()); - - try { - // Old zk nodes can be cleaned up by ZooZap - zoo.putPersistentData(zLockPath.toString(), new byte[] {}, NodeExistsPolicy.SKIP); - } catch (KeeperException e) { - if (e.code() == KeeperException.Code.NOAUTH) { - LOG.error("Failed to write to ZooKeeper. Ensure that" - + " accumulo.properties, specifically instance.secret, is consistent."); - } - throw e; - } - + final ServiceLockPath zLockPath = + context.getServerPaths().createScanServerPath(getResourceGroup(), clientAddress); + ServiceLockSupport.createNonHaServiceLockPath(Type.SCAN_SERVER, zoo, zLockPath); serverLockUUID = UUID.randomUUID(); scanServerLock = new ServiceLock(getContext().getZooSession(), zLockPath, serverLockUUID); - LockWatcher lw = new ServiceLockWatcher(Type.SCAN_SERVER, () -> serverStopRequested, - LockWatcher lw = new ServiceLockWatcher("scan server", () -> getShutdownComplete().get(), - (name) -> context.getLowMemoryDetector().logGCInfo(getConfiguration())); ++ LockWatcher lw = new ServiceLockWatcher(Type.SCAN_SERVER, () -> getShutdownComplete().get(), + (type) -> context.getLowMemoryDetector().logGCInfo(getConfiguration())); for (int i = 0; i < 120 / 5; i++) { zoo.putPersistentData(zLockPath.toString(), new byte[0], NodeExistsPolicy.SKIP); @@@ -390,37 -400,40 +389,57 @@@ // We need to set the compaction manager so that we don't get an NPE in CompactableImpl.close ServiceLock lock = announceExistence(); + this.getContext().setServiceLock(lock); + + int threadPoolSize = getConfiguration().getCount(Property.SSERV_WAL_SORT_MAX_CONCURRENT); + if (threadPoolSize > 0) { - final LogSorter logSorter = new LogSorter(context, getConfiguration()); ++ final LogSorter logSorter = new LogSorter(this); + try { + // Attempt to process all existing log sorting work and start a background + // thread to look for log sorting work in the future - logSorter.startWatchingForRecoveryLogs(threadPoolSize); ++ logSorter.startWatchingForRecoveryLogs(); + } catch (Exception ex) { + LOG.error("Error starting LogSorter"); + throw new RuntimeException(ex); + } + } else { + LOG.info( + "Log sorting for tablet recovery is disabled, SSERV_WAL_SORT_MAX_CONCURRENT is less than 1."); + } try { - while (!serverStopRequested) { - UtilWaitThread.sleep(1000); - updateIdleStatus( - sessionManager.getActiveScans().isEmpty() && tabletMetadataCache.estimatedSize() == 0); + while (!isShutdownRequested()) { + if (Thread.currentThread().isInterrupted()) { + LOG.info("Server process thread has been interrupted, shutting down"); + break; + } + try { + Thread.sleep(1000); + updateIdleStatus(sessionManager.getActiveScans().isEmpty() + && tabletMetadataCache.estimatedSize() == 0); + } catch (InterruptedException e) { + LOG.info("Interrupt Exception received, shutting down"); + gracefulShutdown(getContext().rpcCreds()); + } } } finally { - LOG.info("Stopping Thrift Servers"); + // Wait for scans to got to zero + while (!sessionManager.getActiveScans().isEmpty()) { + LOG.debug("Waiting on {} active scans to complete.", + sessionManager.getActiveScans().size()); + UtilWaitThread.sleep(1000); + } + + LOG.debug("Stopping Thrift Servers"); address.server.stop(); - LOG.info("Removing server scan references"); - this.getContext().getAmple().scanServerRefs().delete(clientAddress.toString(), - serverLockUUID); + try { + LOG.info("Removing server scan references"); + this.getContext().getAmple().scanServerRefs().delete(clientAddress.toString(), + serverLockUUID); + } catch (Exception e) { + LOG.warn("Failed to remove scan server refs from metadata location", e); + } try { LOG.debug("Closing filesystems"); diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index aea44fcdaf,333e35d542..4686460c72 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@@ -94,26 -93,29 +96,32 @@@ import org.apache.accumulo.core.manager import org.apache.accumulo.core.manager.thrift.TabletServerStatus; import org.apache.accumulo.core.metadata.AccumuloTable; import org.apache.accumulo.core.metadata.TServerInstance; +import org.apache.accumulo.core.metadata.schema.Ample; + import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; import org.apache.accumulo.core.metadata.schema.TabletMetadata; -import org.apache.accumulo.core.metadata.schema.TabletsMetadata; import org.apache.accumulo.core.metrics.MetricsInfo; -import org.apache.accumulo.core.process.thrift.ServerProcessService; import org.apache.accumulo.core.rpc.ThriftUtil; import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; import org.apache.accumulo.core.spi.fs.VolumeChooserEnvironment; +import org.apache.accumulo.core.spi.ondemand.OnDemandTabletUnloader; +import org.apache.accumulo.core.spi.ondemand.OnDemandTabletUnloader.UnloaderParams; + import org.apache.accumulo.core.tablet.thrift.TUnloadTabletGoal; +import org.apache.accumulo.core.tabletserver.UnloaderParamsImpl; import org.apache.accumulo.core.tabletserver.log.LogEntry; + import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.core.util.ComparablePair; + import org.apache.accumulo.core.util.Halt; import org.apache.accumulo.core.util.MapCounter; import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.Retry; import org.apache.accumulo.core.util.Retry.RetryFactory; import org.apache.accumulo.core.util.UtilWaitThread; + import org.apache.accumulo.core.util.threads.ThreadPoolNames; import org.apache.accumulo.core.util.threads.Threads; + import org.apache.accumulo.core.util.time.SteadyTime; import org.apache.accumulo.server.AbstractServer; import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.ServiceEnvironmentImpl; import org.apache.accumulo.server.TabletLevel; import org.apache.accumulo.server.client.ClientServiceHandler; import org.apache.accumulo.server.compaction.CompactionWatcher; @@@ -238,7 -255,7 +243,7 @@@ public class TabletServer extends Abstr log.info("Version " + Constants.VERSION); log.info("Instance " + getInstanceID()); this.sessionManager = new SessionManager(context); -- this.logSorter = new LogSorter(context, aconf); ++ this.logSorter = new LogSorter(this); this.statsKeeper = new TabletStatsKeeper(); final int numBusyTabletsToLog = aconf.getCount(Property.TSERV_LOG_BUSY_TABLETS_COUNT); final long logBusyTabletsDelay = @@@ -375,9 -390,22 +380,9 @@@ void requestStop() { log.info("Stop requested."); - serverStopRequested = true; + gracefulShutdown(getContext().rpcCreds()); } - private class SplitRunner implements Runnable { - private final Tablet tablet; - - public SplitRunner(Tablet tablet) { - this.tablet = tablet; - } - - @Override - public void run() { - splitTablet(tablet); - } - } - public long updateTotalQueuedMutationSize(long additionalMutationSize) { var newTotal = totalQueuedMutationSize.addAndGet(additionalMutationSize); if (log.isTraceEnabled()) { @@@ -498,8 -639,8 +503,8 @@@ UUID tabletServerUUID = UUID.randomUUID(); tabletServerLock = new ServiceLock(getContext().getZooSession(), zLockPath, tabletServerUUID); - LockWatcher lw = new ServiceLockWatcher(Type.TABLET_SERVER, () -> serverStopRequested, - LockWatcher lw = new ServiceLockWatcher("tablet server", () -> getShutdownComplete().get(), - (name) -> context.getLowMemoryDetector().logGCInfo(getConfiguration())); ++ LockWatcher lw = new ServiceLockWatcher(Type.TABLET_SERVER, () -> getShutdownComplete().get(), + (type) -> context.getLowMemoryDetector().logGCInfo(getConfiguration())); for (int i = 0; i < 120 / 5; i++) { zoo.putPersistentData(zLockPath.toString(), new byte[0], NodeExistsPolicy.SKIP); @@@ -583,32 -729,73 +588,36 @@@ throw new RuntimeException(e); } - try { - logSorter.startWatchingForRecoveryLogs(this); - } catch (Exception ex) { - log.error("Error setting watches for recoveries"); - throw new RuntimeException(ex); + int threadPoolSize = + getContext().getConfiguration().getCount(Property.TSERV_WAL_SORT_MAX_CONCURRENT); + if (threadPoolSize > 0) { + try { + // Attempt to process all existing log sorting work and start a background + // thread to look for log sorting work in the future - logSorter.startWatchingForRecoveryLogs(threadPoolSize); ++ logSorter.startWatchingForRecoveryLogs(); + } catch (Exception ex) { + log.error("Error starting LogSorter"); + throw new RuntimeException(ex); + } + } else { + log.info( + "Log sorting for tablet recovery is disabled, TSERV_WAL_SORT_MAX_CONCURRENT is less than 1."); } - final AccumuloConfiguration aconf = getConfiguration(); - - long tabletCheckFrequency = aconf.getTimeInMillis(Property.TSERV_HEALTH_CHECK_FREQ); - // Periodically check that metadata of tablets matches what is held in memory - watchCriticalFixedDelay(aconf, tabletCheckFrequency, () -> { - final SortedMap<KeyExtent,Tablet> onlineTabletsSnapshot = onlineTablets.snapshot(); - - Map<KeyExtent,MetadataUpdateCount> updateCounts = new HashMap<>(); - // gather updateCounts for each tablet before reading tablet metadata - onlineTabletsSnapshot.forEach((ke, tablet) -> { - updateCounts.put(ke, tablet.getUpdateCount()); - }); - - Instant start = Instant.now(); - Duration duration; - Span mdScanSpan = TraceUtil.startSpan(this.getClass(), "metadataScan"); - try (Scope scope = mdScanSpan.makeCurrent()) { - List<KeyExtent> missingTablets = new ArrayList<>(); - // gather metadata for all tablets readTablets() - try (TabletsMetadata tabletsMetadata = getContext().getAmple().readTablets() - .forTablets(onlineTabletsSnapshot.keySet(), Optional.of(missingTablets::add)) - .fetch(FILES, LOGS, ECOMP, PREV_ROW).build()) { - duration = Duration.between(start, Instant.now()); - log.debug("Metadata scan took {}ms for {} tablets read.", duration.toMillis(), - onlineTabletsSnapshot.keySet().size()); - - // for each tablet, compare its metadata to what is held in memory - for (var tabletMetadata : tabletsMetadata) { - KeyExtent extent = tabletMetadata.getExtent(); - Tablet tablet = onlineTabletsSnapshot.get(extent); - MetadataUpdateCount counter = updateCounts.get(extent); - tablet.compareTabletInfo(counter, tabletMetadata); - } + final AccumuloConfiguration aconf = getConfiguration(); - for (var extent : missingTablets) { - Tablet tablet = onlineTabletsSnapshot.get(extent); - if (!tablet.isClosed()) { - log.error("Tablet {} is open but does not exist in metadata table.", extent); - } - } - } - } catch (Exception e) { - log.error("Unable to complete verification of tablet metadata", e); - TraceUtil.setException(mdScanSpan, e, true); - } finally { - mdScanSpan.end(); - } + final long onDemandUnloaderInterval = + aconf.getTimeInMillis(Property.TSERV_ONDEMAND_UNLOADER_INTERVAL); + watchCriticalFixedDelay(aconf, onDemandUnloaderInterval, () -> { + evaluateOnDemandTabletsForUnload(); }); - final long CLEANUP_BULK_LOADED_CACHE_MILLIS = TimeUnit.MINUTES.toMillis(15); - watchCriticalScheduledTask(context.getScheduledExecutor().scheduleWithFixedDelay( - new BulkImportCacheCleaner(this), CLEANUP_BULK_LOADED_CACHE_MILLIS, - CLEANUP_BULK_LOADED_CACHE_MILLIS, TimeUnit.MILLISECONDS)); - HostAndPort managerHost; - while (!serverStopRequested) { + while (!isShutdownRequested()) { + if (Thread.currentThread().isInterrupted()) { - LOG.info("Server process thread has been interrupted, shutting down"); ++ log.info("Server process thread has been interrupted, shutting down"); + break; + } updateIdleStatus(getOnlineTablets().isEmpty()); @@@ -673,20 -860,77 +682,77 @@@ } } - // wait for shutdown - // if the main thread exits oldServer the manager listener, the JVM will - // kill the other threads and finalize objects. We want the shutdown that is - // running in the manager listener thread to complete oldServer this happens. - // consider making other threads daemon threads so that objects don't - // get prematurely finalized - synchronized (this) { - while (!shutdownComplete) { + // Tell the Manager we are shutting down so that it doesn't try + // to assign tablets. + ManagerClientService.Client iface = managerConnection(getManagerAddress()); + try { + iface.tabletServerStopping(TraceUtil.traceInfo(), getContext().rpcCreds(), + getClientAddressString()); + } catch (TException e) { - LOG.error("Error informing Manager that we are shutting down, halting server", e); ++ log.error("Error informing Manager that we are shutting down, halting server", e); + Halt.halt("Error informing Manager that we are shutting down, exiting!", -1); + } finally { + returnManagerConnection(iface); + } + + // Best-effort attempt at unloading tablets. + log.debug("Unloading tablets"); + final List<Future<?>> futures = new ArrayList<>(); + final ThreadPoolExecutor tpe = getContext().threadPools() + .getPoolBuilder(ThreadPoolNames.TSERVER_SHUTDOWN_UNLOAD_TABLET_POOL).numCoreThreads(8) + .numMaxThreads(16).build(); + + iface = managerConnection(getManagerAddress()); + boolean managerDown = false; + + try { + for (DataLevel level : new DataLevel[] {DataLevel.USER, DataLevel.METADATA, DataLevel.ROOT}) { + getOnlineTablets().keySet().forEach(ke -> { + if (DataLevel.of(ke.tableId()) == level) { + futures.add(tpe.submit(new UnloadTabletHandler(this, ke, TUnloadTabletGoal.UNASSIGNED, + SteadyTime.from(System.currentTimeMillis(), TimeUnit.MILLISECONDS)))); + } + }); + while (!futures.isEmpty()) { + Iterator<Future<?>> unloads = futures.iterator(); + while (unloads.hasNext()) { + Future<?> f = unloads.next(); + if (f.isDone()) { + if (!managerDown) { + ManagerMessage mm = managerMessages.poll(); + try { + mm.send(getContext().rpcCreds(), getClientAddressString(), iface); + } catch (TException e) { + managerDown = true; - LOG.debug("Error sending message to Manager during tablet unloading, msg: {}", ++ log.debug("Error sending message to Manager during tablet unloading, msg: {}", + e.getMessage()); + } + } + unloads.remove(); + } + } + log.debug("Waiting on {} {} tablets to close.", futures.size(), level); + UtilWaitThread.sleep(1000); + } + log.debug("All {} tablets unloaded", level); + } + } finally { + if (!managerDown) { try { - this.wait(1000); - } catch (InterruptedException e) { - log.error(e.toString()); + ManagerMessage mm = managerMessages.poll(); + do { + if (mm != null) { + mm.send(getContext().rpcCreds(), getClientAddressString(), iface); + } + mm = managerMessages.poll(); + } while (mm != null); + } catch (TException e) { - LOG.debug("Error sending message to Manager during tablet unloading, msg: {}", ++ log.debug("Error sending message to Manager during tablet unloading, msg: {}", + e.getMessage()); } } + returnManagerConnection(iface); + tpe.shutdown(); } log.debug("Stopping Thrift Servers"); diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java index 1fdae2890e,1810e04791..74fa6680be --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java @@@ -222,15 -222,15 +223,17 @@@ public class LogSorter } } ++ private final AbstractServer server; private final ServerContext context; private final AccumuloConfiguration conf; private final double walBlockSize; private final CryptoService cryptoService; private final AccumuloConfiguration sortedLogConf; -- public LogSorter(ServerContext context, AccumuloConfiguration conf) { -- this.context = context; -- this.conf = conf; ++ public LogSorter(AbstractServer server) { ++ this.server = server; ++ this.context = this.server.getContext(); ++ this.conf = this.context.getConfiguration(); this.sortedLogConf = extractSortedLogConfig(this.conf); this.walBlockSize = DfsLogger.getWalBlockSize(this.conf); CryptoEnvironment env = new CryptoEnvironmentImpl(CryptoEnvironment.Scope.RECOVERY); @@@ -292,31 -292,14 +295,31 @@@ } } - public void startWatchingForRecoveryLogs(AbstractServer server) - throws KeeperException, InterruptedException { + /** + * Sort any logs that need sorting in the current thread. + * + * @return The time in millis when the next check can be done. + */ + public long sortLogsIfNeeded() throws KeeperException, InterruptedException { + DistributedWorkQueue dwq = new DistributedWorkQueue( - context.getZooKeeperRoot() + Constants.ZRECOVERY, sortedLogConf, context); ++ context.getZooKeeperRoot() + Constants.ZRECOVERY, sortedLogConf, server); + dwq.processExistingWork(new LogProcessor(), MoreExecutors.newDirectExecutorService(), 1, false); + return System.currentTimeMillis() + dwq.getCheckInterval(); + } + + /** + * Sort any logs that need sorting in a ThreadPool using + * {@link Property#TSERV_WAL_SORT_MAX_CONCURRENT} threads. This method will start a background + * thread to look for log sorting work in the future that will be processed by the + * ThreadPoolExecutor + */ - public void startWatchingForRecoveryLogs(int threadPoolSize) - throws KeeperException, InterruptedException { ++ public void startWatchingForRecoveryLogs() throws KeeperException, InterruptedException { + int threadPoolSize = this.conf.getCount(Property.TSERV_WAL_SORT_MAX_CONCURRENT); ThreadPoolExecutor threadPool = ThreadPools.getServerThreadPools().getPoolBuilder(TSERVER_WAL_SORT_CONCURRENT_POOL) .numCoreThreads(threadPoolSize).enableThreadPoolMetrics().build(); new DistributedWorkQueue(context.getZooKeeperRoot() + Constants.ZRECOVERY, sortedLogConf, - context).processExistingAndFuture(new LogProcessor(), threadPool); - server).startProcessing(new LogProcessor(), threadPool); ++ server).processExistingAndFuture(new LogProcessor(), threadPool); } public List<RecoveryStatus> getLogSorts() { diff --cc server/tserver/src/test/java/org/apache/accumulo/tserver/log/RecoveryLogsIteratorTest.java index dd9b689229,dd9b689229..3d070c983b --- a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/RecoveryLogsIteratorTest.java +++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/RecoveryLogsIteratorTest.java @@@ -23,6 -23,6 +23,7 @@@ import static org.apache.accumulo.tserv import static org.easymock.EasyMock.createMock; import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.replay; ++import static org.easymock.EasyMock.verify; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; @@@ -48,6 -48,6 +49,7 @@@ import org.apache.accumulo.server.Serve import org.apache.accumulo.server.fs.VolumeManager; import org.apache.accumulo.server.fs.VolumeManagerImpl; import org.apache.accumulo.server.log.SortedLogState; ++import org.apache.accumulo.tserver.TabletServer; import org.apache.accumulo.tserver.WithTestNames; import org.apache.accumulo.tserver.logger.LogFileKey; import org.apache.accumulo.tserver.logger.LogFileValue; @@@ -66,6 -66,6 +68,7 @@@ public class RecoveryLogsIteratorTest e private VolumeManager fs; private File workDir; static final KeyExtent extent = new KeyExtent(TableId.of("table"), null, null); ++ static TabletServer server; static ServerContext context; static LogSorter logSorter; @@@ -75,21 -75,21 +78,23 @@@ @BeforeEach public void setUp() throws Exception { context = createMock(ServerContext.class); -- ++ server = createMock(TabletServer.class); workDir = new File(tempDir, testName()); String path = workDir.getAbsolutePath(); fs = VolumeManagerImpl.getLocalForTesting(path); ++ expect(server.getContext()).andReturn(context).anyTimes(); expect(context.getCryptoFactory()).andReturn(new GenericCryptoServiceFactory()).anyTimes(); expect(context.getVolumeManager()).andReturn(fs).anyTimes(); expect(context.getConfiguration()).andReturn(DefaultConfiguration.getInstance()).anyTimes(); -- replay(context); ++ replay(server, context); -- logSorter = new LogSorter(context, DefaultConfiguration.getInstance()); ++ logSorter = new LogSorter(server); } @AfterEach public void tearDown() throws Exception { fs.close(); ++ verify(server, context); } static class KeyValue implements Comparable<KeyValue> { diff --cc server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java index 5645dea90f,5645dea90f..bffb83f594 --- a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java +++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java @@@ -70,6 -70,6 +70,7 @@@ import org.apache.accumulo.server.data. import org.apache.accumulo.server.fs.VolumeManager; import org.apache.accumulo.server.fs.VolumeManagerImpl; import org.apache.accumulo.server.log.SortedLogState; ++import org.apache.accumulo.tserver.TabletServer; import org.apache.accumulo.tserver.WithTestNames; import org.apache.accumulo.tserver.logger.LogEvents; import org.apache.accumulo.tserver.logger.LogFileKey; @@@ -96,6 -96,6 +97,7 @@@ public class SortedLogRecoveryTest exte static final Text cf = new Text("cf"); static final Text cq = new Text("cq"); static final Value value = new Value("value"); ++ static TabletServer server; static ServerContext context; static LogSorter logSorter; @@@ -104,6 -104,6 +106,7 @@@ @BeforeEach public void setup() { ++ server = EasyMock.createMock(TabletServer.class); context = EasyMock.createMock(ServerContext.class); } @@@ -186,12 -186,12 +189,12 @@@ final String workdir = new File(tempDir, testName()).getAbsolutePath(); try (var fs = VolumeManagerImpl.getLocalForTesting(workdir)) { CryptoServiceFactory cryptoFactory = new GenericCryptoServiceFactory(); -- ++ expect(server.getContext()).andReturn(context).anyTimes(); expect(context.getVolumeManager()).andReturn(fs).anyTimes(); expect(context.getCryptoFactory()).andReturn(cryptoFactory).anyTimes(); expect(context.getConfiguration()).andReturn(DefaultConfiguration.getInstance()).anyTimes(); -- replay(context); -- logSorter = new LogSorter(context, DefaultConfiguration.getInstance()); ++ replay(server, context); ++ logSorter = new LogSorter(server); final Path workdirPath = new Path("file://" + workdir); fs.deleteRecursively(workdirPath); @@@ -223,7 -223,7 +226,7 @@@ SortedLogRecovery recovery = new SortedLogRecovery(context, fileLenCache, cacheProvider); CaptureMutations capture = new CaptureMutations(); recovery.recover(extent, dirs, files, capture); -- verify(context); ++ verify(server, context); return capture.result; } } @@@ -809,7 -809,7 +812,7 @@@ assertEquals(1, mutations1.size()); assertEquals(m2, mutations1.get(0)); -- reset(context); ++ reset(server, context); List<Mutation> mutations2 = recover(logs, e2); assertEquals(2, mutations2.size()); assertEquals(m3, mutations2.get(0)); @@@ -820,7 -820,7 +823,7 @@@ Arrays.sort(entries2); logs.put("entries2", entries2); -- reset(context); ++ reset(server, context); mutations2 = recover(logs, e2); assertEquals(1, mutations2.size()); assertEquals(m4, mutations2.get(0)); @@@ -860,7 -860,7 +863,7 @@@ // test having different paths for the same file. This can happen as a result of upgrade or user // changing configuration runPathTest(false, "/t1/f1", "/t1/f0"); -- reset(context); ++ reset(server, context); runPathTest(true, "/t1/f1", "/t1/f0", "/t1/f1"); String[] aliases = {"/t1/f1", "hdfs://nn1/accumulo/tables/8/t1/f1", @@@ -871,12 -871,12 +874,12 @@@ for (String alias1 : aliases) { for (String alias2 : aliases) { -- reset(context); ++ reset(server, context); runPathTest(true, alias1, alias2); for (String other : others) { -- reset(context); ++ reset(server, context); runPathTest(true, alias1, other, alias2); -- reset(context); ++ reset(server, context); runPathTest(true, alias1, alias2, other); } } @@@ -884,7 -884,7 +887,7 @@@ for (String alias1 : aliases) { for (String other : others) { -- reset(context); ++ reset(server, context); runPathTest(false, alias1, other); } } @@@ -1035,34 -1035,34 +1038,34 @@@ logs.put("entries2", entries2); -- reset(context); ++ reset(server, context); mutations = recover(logs, extent); assertEquals(1, mutations.size()); assertEquals(m1, mutations.get(0)); logs.put("entries3", entries3); -- reset(context); ++ reset(server, context); mutations = recover(logs, extent); assertEquals(1, mutations.size()); assertEquals(m1, mutations.get(0)); logs.put("entries4", entries4); -- reset(context); ++ reset(server, context); mutations = recover(logs, extent); assertEquals(1, mutations.size()); assertEquals(m1, mutations.get(0)); logs.put("entries5", entries5); -- reset(context); ++ reset(server, context); mutations = recover(logs, extent); assertEquals(0, mutations.size()); logs.put("entries6", entries6); -- reset(context); ++ reset(server, context); mutations = recover(logs, extent); assertEquals(1, mutations.size()); assertEquals(m2, mutations.get(0)); @@@ -1098,8 -1098,8 +1101,12 @@@ // test all the possible properties for tserver.sort.file. prefix String prop = Property.TSERV_WAL_SORT_FILE_PREFIX + "invalid"; testConfig.set(prop, "snappy"); -- assertThrows(IllegalArgumentException.class, () -> new LogSorter(context, testConfig), ++ expect(server.getContext()).andReturn(context).anyTimes(); ++ expect(context.getConfiguration()).andReturn(testConfig).anyTimes(); ++ replay(server, context); ++ assertThrows(IllegalArgumentException.class, () -> new LogSorter(server), "Did not throw IllegalArgumentException for " + prop); ++ verify(server, context); } @Test @@@ -1122,11 -1122,11 +1129,12 @@@ try (var vm = VolumeManagerImpl.getLocalForTesting(workdir)) { CryptoServiceFactory cryptoFactory = new GenericCryptoServiceFactory(); ++ expect(server.getContext()).andReturn(context).anyTimes(); expect(context.getCryptoFactory()).andReturn(cryptoFactory).anyTimes(); expect(context.getVolumeManager()).andReturn(vm).anyTimes(); -- expect(context.getConfiguration()).andReturn(DefaultConfiguration.getInstance()).anyTimes(); -- replay(context); -- LogSorter sorter = new LogSorter(context, testConfig); ++ expect(context.getConfiguration()).andReturn(testConfig).anyTimes(); ++ replay(server, context); ++ LogSorter sorter = new LogSorter(server); final Path workdirPath = new Path("file://" + workdir); vm.deleteRecursively(workdirPath); diff --cc server/tserver/src/test/java/org/apache/accumulo/tserver/log/TestUpgradePathForWALogs.java index 8aab03b5c6,8aab03b5c6..cd7d3bf29f --- a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/TestUpgradePathForWALogs.java +++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/TestUpgradePathForWALogs.java @@@ -33,12 -33,12 +33,12 @@@ import java.io.IOException import java.io.InputStream; import java.io.OutputStream; --import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.DefaultConfiguration; import org.apache.accumulo.core.spi.crypto.GenericCryptoServiceFactory; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.fs.VolumeManager; import org.apache.accumulo.server.fs.VolumeManagerImpl; ++import org.apache.accumulo.tserver.TabletServer; import org.apache.accumulo.tserver.WithTestNames; import org.apache.commons.io.IOUtils; import org.apache.hadoop.fs.Path; @@@ -60,8 -60,8 +60,8 @@@ public class TestUpgradePathForWALogs e // logs from 2.0 were changed for improved crypto private static final String WALOG_FROM_20 = "/walog-from-20.walog"; -- private static final AccumuloConfiguration config = DefaultConfiguration.getInstance(); private ServerContext context; ++ private TabletServer server; @TempDir private static File tempDir; @@@ -71,6 -71,6 +71,7 @@@ @BeforeEach public void setUp() throws Exception { context = createMock(ServerContext.class); ++ server = createMock(TabletServer.class); // Create a new subdirectory for each test perTestTempSubDir = new File(tempDir, testName()); @@@ -81,14 -81,14 +82,16 @@@ VolumeManager fs = VolumeManagerImpl.getLocalForTesting(path); ++ expect(server.getContext()).andReturn(context).anyTimes(); ++ expect(context.getConfiguration()).andReturn(DefaultConfiguration.getInstance()).anyTimes(); expect(context.getCryptoFactory()).andReturn(new GenericCryptoServiceFactory()).anyTimes(); expect(context.getVolumeManager()).andReturn(fs).anyTimes(); -- replay(context); ++ replay(server, context); } @AfterEach public void tearDown() { -- verify(context); ++ verify(server, context); } /** @@@ -105,7 -105,7 +108,7 @@@ walogInHDFStream.flush(); walogInHDFStream.close(); -- LogSorter logSorter = new LogSorter(context, config); ++ LogSorter logSorter = new LogSorter(server); LogSorter.LogProcessor logProcessor = logSorter.new LogProcessor(); assertThrows(IllegalArgumentException.class, @@@ -128,7 -128,7 +131,7 @@@ assertFalse(context.getVolumeManager().exists(getFinishedMarkerPath(destPath))); -- LogSorter logSorter = new LogSorter(context, config); ++ LogSorter logSorter = new LogSorter(server); LogSorter.LogProcessor logProcessor = logSorter.new LogProcessor(); logProcessor.sort(context.getVolumeManager(), walogToTest, @@@ -152,7 -152,7 +155,7 @@@ assertFalse(context.getVolumeManager().exists(getFinishedMarkerPath(destPath))); -- LogSorter logSorter = new LogSorter(context, config); ++ LogSorter logSorter = new LogSorter(server); LogSorter.LogProcessor logProcessor = logSorter.new LogProcessor(); logProcessor.sort(context.getVolumeManager(), walogToTest, new Path("file://" + testPath + walogToTest), destPath); diff --cc test/src/main/java/org/apache/accumulo/test/compaction/ExternalDoNothingCompactor.java index a032aeb191,4d13023a31..c579ea25c2 --- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalDoNothingCompactor.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalDoNothingCompactor.java @@@ -65,8 -63,7 +65,8 @@@ public class ExternalDoNothingCompacto CountDownLatch stopped, AtomicReference<Throwable> err) { // Set this to true so that only 1 external compaction is run + final AtomicReference<FileCompactor> ref = new AtomicReference<>(); - this.shutdown = true; + gracefulShutdown(getContext().rpcCreds()); return new FileCompactorRunnable() { diff --cc test/src/main/java/org/apache/accumulo/test/fate/FateExecutionOrderIT.java index 9c1234efd4,0000000000..4b377e09da mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/FateExecutionOrderIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/FateExecutionOrderIT.java @@@ -1,421 -1,0 +1,422 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.fate; + +import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.SUBMITTED; +import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.SUCCESSFUL; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.time.Duration; +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.admin.NewTableConfiguration; +import org.apache.accumulo.core.client.admin.TabletAvailability; +import org.apache.accumulo.core.conf.ConfigurationCopy; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.fate.Fate; +import org.apache.accumulo.core.fate.Fate.TxInfo; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.FateStore; +import org.apache.accumulo.core.fate.Repo; +import org.apache.accumulo.harness.SharedMiniClusterBase; +import org.apache.accumulo.server.ServerContext; ++import org.apache.accumulo.test.fate.FateTestRunner.TestEnv; +import org.apache.hadoop.io.Text; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import com.google.common.collect.Iterators; + +public abstract class FateExecutionOrderIT extends SharedMiniClusterBase + implements FateTestRunner<FateExecutionOrderIT.FeoTestEnv> { + + public static class FeoTestEnv extends TestEnv { + private final AccumuloClient client; + + public FeoTestEnv(AccumuloClient client) { + this.client = client; + } + + AccumuloClient getClient() { + return client; + } + } + + public static class FirstOp implements Repo<FateExecutionOrderIT.FeoTestEnv> { + + private static final long serialVersionUID = 1L; + + protected boolean isTrackingDataSet(FateId tid, FeoTestEnv env, String step) throws Exception { + try (Scanner scanner = env.getClient().createScanner(FATE_TRACKING_TABLE)) { + return scanner.stream() + .anyMatch(e -> e.getKey().getColumnFamily().toString().equals(tid.canonical()) + && e.getValue().toString().equals(step)); + } + } + + protected static void insertTrackingData(FateId tid, FeoTestEnv env, String step) + throws TableNotFoundException, MutationsRejectedException { + try (BatchWriter bw = env.getClient().createBatchWriter(FATE_TRACKING_TABLE)) { + Mutation mut = new Mutation(Long.toString(System.currentTimeMillis())); + mut.put(tid.canonical(), "", step); + bw.addMutation(mut); + } + } + + @Override + public long isReady(FateId tid, FeoTestEnv env) throws Exception { + // First call to isReady will return that it's not ready (defer time of 100ms), inserting + // the data 'isReady1' so we know isReady was called once. The second attempt (after the + // deferral time) will pass as ready (return 0) and insert the data 'isReady2' so we know + // the second call to isReady was made + Thread.sleep(50); + var step = this.getName() + "::isReady"; + if (isTrackingDataSet(tid, env, step + "1")) { + insertTrackingData(tid, env, step + "2"); + return 0; + } else { + insertTrackingData(tid, env, step + "1"); + return 100; + } + } + + @Override + public String getName() { + return this.getClass().getSimpleName(); + } + + @Override + public Repo<FeoTestEnv> call(FateId tid, FeoTestEnv env) throws Exception { + Thread.sleep(50); + insertTrackingData(tid, env, this.getName() + "::call"); + return new SecondOp(); + } + + @Override + public void undo(FateId fateId, FeoTestEnv environment) throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + public String getReturn() { + return ""; + } + } + + public static class SecondOp extends FirstOp { + private static final long serialVersionUID = 1L; + + @Override + public Repo<FeoTestEnv> call(FateId tid, FeoTestEnv environment) throws Exception { + super.call(tid, environment); + return new LastOp(); + } + + } + + public static class LastOp extends FirstOp { + private static final long serialVersionUID = 1L; + + @Override + public Repo<FeoTestEnv> call(FateId tid, FeoTestEnv environment) throws Exception { + super.call(tid, environment); + return null; + } + } + + private static final String FATE_TRACKING_TABLE = "fate_tracking"; + + @BeforeAll + public static void setup() throws Exception { + SharedMiniClusterBase.startMiniCluster(); + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + NewTableConfiguration ntc = new NewTableConfiguration(); + ntc.withInitialTabletAvailability(TabletAvailability.HOSTED); + client.tableOperations().create(FATE_TRACKING_TABLE, ntc); + } + } + + @AfterAll + public static void teardown() throws Exception { + SharedMiniClusterBase.stopMiniCluster(); + } + + @BeforeEach + public void before() throws Exception { + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + client.tableOperations().deleteRows(FATE_TRACKING_TABLE, null, null); + } + } + + private void waitFor(FateStore<FeoTestEnv> store, FateId txid) throws Exception { + while (store.read(txid).getStatus() != SUCCESSFUL) { + Thread.sleep(50); + } + } + + protected Fate<FeoTestEnv> initializeFate(AccumuloClient client, FateStore<FeoTestEnv> store) { + ConfigurationCopy config = new ConfigurationCopy(); + config.set(Property.GENERAL_THREADPOOL_SIZE, "2"); + config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1"); + return new Fate<>(new FeoTestEnv(client), store, false, r -> r + "", config); + } + + private static Entry<FateId,String> toIdStep(Entry<Key,Value> e) { + return new AbstractMap.SimpleImmutableEntry<>( + FateId.from(e.getKey().getColumnFamily().toString()), e.getValue().toString()); + } + + @Test + public void testInterleaving() throws Exception { + executeTest(this::testInterleaving); + } + + protected void testInterleaving(FateStore<FeoTestEnv> store, ServerContext sctx) + throws Exception { + + // This test verifies that FATE will interleave at least once between fate operations when + // their isReady() returns > 0. Interleaving is not guaranteed, so we just check for one + // occurrence which is highly unlikely to fail unless something is broken with FATE. + // This test also ensures that the expected order of operations occurs per fate op. + // Interleaving should have no effect on this. + + final int numFateIds = 3; + FateId[] fateIds = new FateId[numFateIds]; + + for (int i = 0; i < numFateIds; i++) { + fateIds[i] = store.create(); + var txStore = store.reserve(fateIds[i]); + try { + txStore.push(new FirstOp()); + txStore.setTransactionInfo(TxInfo.TX_NAME, "TEST_" + i); + txStore.setStatus(SUBMITTED); + } finally { + txStore.unreserve(Duration.ZERO); + } + } + + Fate<FeoTestEnv> fate = null; + + // The execution order of the transactions is not according to their insertion + // order. However, we do know that the first step of each transaction will be + // executed before the second steps. + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + + fate = initializeFate(client, store); + + for (var fateId : fateIds) { + waitFor(store, fateId); + } + + Scanner scanner = client.createScanner(FATE_TRACKING_TABLE); + var iter = scanner.stream().map(FateExecutionOrderIT::toIdStep).iterator(); + + // we should see the following execution order for all fate ids: + // FirstOp::isReady1, FirstOp::isReady2, FirstOp::call, + // SecondOp::isReady1, SecondOp::isReady2, SecondOp::call, + // LastOp::isReady1, LastOp::isReady2, LastOp::call + // the first isReady of each op will defer the op to be executed later, allowing for the FATE + // thread to interleave and work on another fate id, but may not always interleave. + // It is unlikely that the FATE will not interleave at least once in a run, so we will check + // for at least one occurrence. + int interleaves = 0; + int i = 0; + Map.Entry<FateId,String> prevOp = null; + var expRunOrder = List.of("FirstOp::isReady1", "FirstOp::isReady2", "FirstOp::call", + "SecondOp::isReady1", "SecondOp::isReady2", "SecondOp::call", "LastOp::isReady1", + "LastOp::isReady2", "LastOp::call"); + var fateIdsToExpRunOrder = Map.of(fateIds[0], new ArrayList<>(expRunOrder), fateIds[1], + new ArrayList<>(expRunOrder), fateIds[2], new ArrayList<>(expRunOrder)); + + while (iter.hasNext()) { + var currOp = iter.next(); + FateId fateId = currOp.getKey(); + String currStep = currOp.getValue(); + var expRunOrderFateId = fateIdsToExpRunOrder.get(fateId); + + boolean passedFirstStep = !currStep.equals(expRunOrder.get(0)); + boolean prevFateIdDiffered = prevOp != null && !prevOp.getKey().equals(fateId); + if (passedFirstStep && prevFateIdDiffered) { + interleaves++; + } + assertEquals(currStep, expRunOrderFateId.remove(0)); + prevOp = currOp; + i++; + } + + assertTrue(interleaves > 0); + assertEquals(i, expRunOrder.size() * numFateIds); + assertEquals(numFateIds, fateIdsToExpRunOrder.size()); + for (var expRunOrderFateId : fateIdsToExpRunOrder.values()) { + assertTrue(expRunOrderFateId.isEmpty()); + } + + } finally { + if (fate != null) { + fate.shutdown(10, TimeUnit.MINUTES); + } + } + } + + public static class FirstNonInterleavingOp extends FirstOp { + + private static final long serialVersionUID = 1L; + + @Override + public long isReady(FateId tid, FeoTestEnv env) throws Exception { + Thread.sleep(50); + insertTrackingData(tid, env, this.getName() + "::isReady"); + return 0; + } + + @Override + public Repo<FeoTestEnv> call(FateId tid, FeoTestEnv manager) throws Exception { + Thread.sleep(50); + insertTrackingData(tid, manager, this.getName() + "::call"); + return new SecondNonInterleavingOp(); + } + } + + public static class SecondNonInterleavingOp extends FirstNonInterleavingOp { + + private static final long serialVersionUID = 1L; + + @Override + public Repo<FeoTestEnv> call(FateId tid, FeoTestEnv environment) throws Exception { + super.call(tid, environment); + return new LastNonInterleavingOp(); + } + + } + + public static class LastNonInterleavingOp extends FirstNonInterleavingOp { + + private static final long serialVersionUID = 1L; + + @Override + public Repo<FeoTestEnv> call(FateId tid, FeoTestEnv environment) throws Exception { + super.call(tid, environment); + return null; + } + + } + + @Test + public void testNonInterleaving() throws Exception { + executeTest(this::testNonInterleaving); + } + + protected void testNonInterleaving(FateStore<FeoTestEnv> store, ServerContext sctx) + throws Exception { + + // This test ensures that when isReady() always returns zero that all the fate steps will + // execute immediately + + final int numFateIds = 3; + FateId[] fateIds = new FateId[numFateIds]; + + for (int i = 0; i < numFateIds; i++) { + fateIds[i] = store.create(); + var txStore = store.reserve(fateIds[i]); + try { + txStore.push(new FirstNonInterleavingOp()); + txStore.setTransactionInfo(TxInfo.TX_NAME, "TEST_" + i); + txStore.setStatus(SUBMITTED); + } finally { + txStore.unreserve(Duration.ZERO); + } + } + + Fate<FeoTestEnv> fate = null; + + // The execution order of the transactions is not according to their insertion + // order. In this case, without interleaving, a transaction will run start to finish + // before moving on to the next transaction + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + + fate = initializeFate(client, store); + + for (var fateId : fateIds) { + waitFor(store, fateId); + } + + Scanner scanner = client.createScanner(FATE_TRACKING_TABLE); + Iterator<Entry<Key,Value>> iter = scanner.iterator(); + + SortedMap<Key,Value> subset = new TreeMap<>(); + + // should see one fate op execute all of it steps + var seenId1 = verifySameIds(iter, subset); + // should see another fate op execute all of it steps + var seenId2 = verifySameIds(iter, subset); + // should see another fate op execute all of it steps + var seenId3 = verifySameIds(iter, subset); + + assertEquals(Set.of(fateIds[0], fateIds[1], fateIds[2]), Set.of(seenId1, seenId2, seenId3)); + + assertFalse(iter.hasNext()); + + } finally { + if (fate != null) { + fate.shutdown(10, TimeUnit.MINUTES); + } + } + } + + private FateId verifySameIds(Iterator<Entry<Key,Value>> iter, SortedMap<Key,Value> subset) { + subset.clear(); + Iterators.limit(iter, 6).forEachRemaining(e -> subset.put(e.getKey(), e.getValue())); + + Text fateId = subset.keySet().iterator().next().getColumnFamily(); + assertTrue(subset.keySet().stream().allMatch(k -> k.getColumnFamily().equals(fateId))); + + // list is used to ensure correct operations and correct order of operations + var expectedVals = List.of("FirstNonInterleavingOp::isReady", "FirstNonInterleavingOp::call", + "SecondNonInterleavingOp::isReady", "SecondNonInterleavingOp::call", + "LastNonInterleavingOp::isReady", "LastNonInterleavingOp::call"); + var actualVals = subset.values().stream().map(Value::toString).collect(Collectors.toList()); + assertEquals(expectedVals, actualVals); + + return FateId.from(fateId.toString()); + } + +} diff --cc test/src/main/java/org/apache/accumulo/test/functional/GracefulShutdownIT.java index 0000000000,c8e4eec16f..c2d4f9575c mode 000000,100644..100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/GracefulShutdownIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/GracefulShutdownIT.java @@@ -1,0 -1,273 +1,295 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.accumulo.test.functional; + + import static org.junit.jupiter.api.Assertions.assertEquals; ++import static org.junit.jupiter.api.Assertions.assertNotNull; + import static org.junit.jupiter.api.Assertions.assertTrue; + + import java.util.List; + import java.util.Map; + import java.util.Map.Entry; + import java.util.Optional; + import java.util.Set; + import java.util.stream.IntStream; + -import org.apache.accumulo.compactor.Compactor; -import org.apache.accumulo.coordinator.CompactionCoordinator; + import org.apache.accumulo.core.Constants; + import org.apache.accumulo.core.client.Accumulo; + import org.apache.accumulo.core.client.AccumuloClient; + import org.apache.accumulo.core.client.BatchWriter; + import org.apache.accumulo.core.client.IteratorSetting; + import org.apache.accumulo.core.client.Scanner; + import org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel; + import org.apache.accumulo.core.client.admin.CompactionConfig; + import org.apache.accumulo.core.client.admin.NewTableConfiguration; ++import org.apache.accumulo.core.client.admin.servers.ServerId; + import org.apache.accumulo.core.conf.ClientProperty; + import org.apache.accumulo.core.conf.Property; + import org.apache.accumulo.core.data.Key; + import org.apache.accumulo.core.data.Mutation; + import org.apache.accumulo.core.data.Range; + import org.apache.accumulo.core.data.TableId; + import org.apache.accumulo.core.data.Value; + import org.apache.accumulo.core.lock.ServiceLock; -import org.apache.accumulo.core.lock.ServiceLock.ServiceLockPath; + import org.apache.accumulo.core.lock.ServiceLockData; + import org.apache.accumulo.core.lock.ServiceLockData.ThriftService; ++import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector; ++import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath; + import org.apache.accumulo.core.metadata.schema.TabletsMetadata; + import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner; ++import org.apache.accumulo.core.spi.compaction.RatioBasedCompactionPlanner; + import org.apache.accumulo.core.spi.compaction.SimpleCompactionDispatcher; + import org.apache.accumulo.harness.MiniClusterConfigurationCallback; + import org.apache.accumulo.harness.SharedMiniClusterBase; + import org.apache.accumulo.minicluster.ServerType; + import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterControl; + import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; + import org.apache.accumulo.server.ServerContext; + import org.apache.accumulo.server.util.Admin; + import org.apache.accumulo.test.compaction.ExternalCompactionTestUtils; + import org.apache.accumulo.test.util.Wait; -import org.apache.accumulo.tserver.ScanServer; + import org.apache.hadoop.conf.Configuration; + import org.junit.jupiter.api.AfterAll; + import org.junit.jupiter.api.BeforeAll; + import org.junit.jupiter.api.Test; + + import com.google.common.net.HostAndPort; + + public class GracefulShutdownIT extends SharedMiniClusterBase { + + private static final String GROUP_NAME = "graceful"; + + // @formatter:off + private static final String clientConfiguration = + "["+ + " {"+ + " \"isDefault\": true,"+ + " \"maxBusyTimeout\": \"5m\","+ + " \"busyTimeoutMultiplier\": 8,"+ + " \"group\":" + GROUP_NAME + "," + + " \"scanTypeActivations\": [graceful],"+ + " \"attemptPlans\": ["+ + " {"+ + " \"servers\": \"3\","+ + " \"busyTimeout\": \"33ms\","+ + " \"salt\": \"one\""+ + " }"+ + " ]"+ + " }"+ + "]"; + // @formatter:on + + private static class GracefulShutdownITConfig implements MiniClusterConfigurationCallback { + + @Override + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration coreSite) { - cfg.setNumCompactors(0); - cfg.setNumScanServers(0); - cfg.setNumTservers(2); - cfg.setProperty(Property.COMPACTION_COORDINATOR_FINALIZER_COMPLETION_CHECK_INTERVAL, "5s"); ++ cfg.getClusterServerConfiguration().setNumDefaultCompactors(0); ++ cfg.getClusterServerConfiguration().setNumDefaultScanServers(0); ++ cfg.getClusterServerConfiguration().setNumDefaultTabletServers(2); + cfg.setProperty(Property.COMPACTION_COORDINATOR_DEAD_COMPACTOR_CHECK_INTERVAL, "5s"); - cfg.setProperty(Property.COMPACTION_COORDINATOR_TSERVER_COMPACTION_CHECK_INTERVAL, "3s"); - cfg.setProperty(Property.COMPACTION_COORDINATOR_THRIFTCLIENT_PORTSEARCH, "true"); + cfg.setProperty(Property.COMPACTOR_CANCEL_CHECK_INTERVAL, "5s"); + cfg.setProperty(Property.COMPACTOR_PORTSEARCH, "true"); - cfg.setProperty("tserver.compaction.major.service." + GROUP_NAME + ".planner", - DefaultCompactionPlanner.class.getName()); - cfg.setProperty("tserver.compaction.major.service." + GROUP_NAME + ".planner.opts.executors", - "[{'name':'all', 'type': 'external', 'queue': '" + GROUP_NAME + "'}]"); ++ cfg.setProperty(Property.COMPACTION_SERVICE_PREFIX.getKey() + GROUP_NAME + ".planner", ++ RatioBasedCompactionPlanner.class.getName()); ++ cfg.setProperty( ++ Property.COMPACTION_SERVICE_PREFIX.getKey() + GROUP_NAME + ".planner.opts.groups", ++ "[{'group': '" + GROUP_NAME + "'}]"); + cfg.setClientProperty(ClientProperty.SCAN_SERVER_SELECTOR_OPTS_PREFIX.getKey() + "profiles", + clientConfiguration); + // Timeout scan sessions after being idle for 3 seconds + cfg.setProperty(Property.TSERV_SESSION_MAXIDLE, "3s"); + } + } + + @BeforeAll + public static void startup() throws Exception { + SharedMiniClusterBase.startMiniClusterWithConfig(new GracefulShutdownITConfig()); + } + + @AfterAll + public static void shutdown() throws Exception { + SharedMiniClusterBase.stopMiniCluster(); + } + + @Test + public void testGracefulShutdown() throws Exception { + + // Start ScanServers and Compactors using named groups + final MiniAccumuloClusterControl control = getCluster().getClusterControl(); + + try (final AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + final ServerContext ctx = getCluster().getServerContext(); + final String tableName = getUniqueNames(1)[0]; + + final NewTableConfiguration ntc = new NewTableConfiguration(); + ntc.setProperties(Map.of(Property.TABLE_MAJC_RATIO.getKey(), "10", + "table.compaction.dispatcher", SimpleCompactionDispatcher.class.getName(), + "table.compaction.dispatcher.opts.service", GROUP_NAME)); + + client.tableOperations().create(tableName, ntc); + final TableId tid = ctx.getTableId(tableName); + + // Insert 10 rows, flush after every row to create 10 files + try (BatchWriter writer = client.createBatchWriter(tableName)) { + for (int i : IntStream.rangeClosed(1, 10).toArray()) { + String val = i + ""; + Mutation m = new Mutation(val); + m.put(val, val, val); + writer.addMutation(m); + writer.flush(); + client.tableOperations().flush(tableName, null, null, true); + } + } + long numFiles = getNumFilesForTable(ctx, tid); + assertEquals(10, numFiles); + client.instanceOperations().waitForBalance(); + + // Restart Garbage Collector + final ServiceLockPath gcLockPath = - ServiceLock.path(ctx.getZooKeeperRoot() + Constants.ZGC_LOCK); ++ getCluster().getServerContext().getServerPaths().getGarbageCollector(true); + Optional<ServiceLockData> data = ServiceLock.getLockData(ctx.getZooSession(), gcLockPath); + assertTrue(data.isPresent()); + final HostAndPort gcAddress = data.orElseThrow().getAddress(ThriftService.GC); + assertTrue(!control.getProcesses(ServerType.GARBAGE_COLLECTOR).isEmpty()); + // Don't call `new Admin().execute(new String[] {"signalShutdown", "-h ", host, "-p ", + // Integer.toString(port)})` + // because this poisons the SingletonManager and puts it into SERVER mode + Admin.signalGracefulShutdown(ctx, gcAddress.toString()); + Wait.waitFor(() -> { + control.refreshProcesses(ServerType.GARBAGE_COLLECTOR); + return control.getProcesses(ServerType.GARBAGE_COLLECTOR).isEmpty(); + }); + + // Restart Tablet Server - final List<String> tservers = client.instanceOperations().getTabletServers(); ++ final Set<ServiceLockPath> tservers = getCluster().getServerContext().getServerPaths() ++ .getTabletServer((rg) -> rg.equals(Constants.DEFAULT_RESOURCE_GROUP_NAME), ++ AddressSelector.all(), true); + assertEquals(2, tservers.size()); - final HostAndPort tserverAddress = HostAndPort.fromString(tservers.get(0)); ++ final HostAndPort tserverAddress = ++ HostAndPort.fromString(tservers.iterator().next().getServer()); + Admin.signalGracefulShutdown(ctx, tserverAddress.toString()); + Wait.waitFor(() -> { + control.refreshProcesses(ServerType.TABLET_SERVER); + return control.getProcesses(ServerType.TABLET_SERVER).size() == 1; + }); + client.instanceOperations().waitForBalance(); + control.start(ServerType.TABLET_SERVER); + Wait.waitFor(() -> control.getProcesses(ServerType.TABLET_SERVER).size() == 2); + client.instanceOperations().waitForBalance(); + + // Restart Manager - final List<String> managers = client.instanceOperations().getManagerLocations(); - assertEquals(1, managers.size()); - final HostAndPort managerAddress = HostAndPort.fromString(managers.get(0)); ++ final ServiceLockPath manager = ++ getCluster().getServerContext().getServerPaths().getManager(true); ++ assertNotNull(manager); ++ Set<ServerId> managerLocations = ++ client.instanceOperations().getServers(ServerId.Type.MANAGER); ++ assertNotNull(managerLocations); ++ assertEquals(1, managerLocations.size()); ++ final HostAndPort managerAddress = ++ HostAndPort.fromString(managerLocations.iterator().next().toHostPortString()); + Admin.signalGracefulShutdown(ctx, managerAddress.toString()); + Wait.waitFor(() -> { + control.refreshProcesses(ServerType.MANAGER); + return control.getProcesses(ServerType.MANAGER).isEmpty(); + }); + control.start(ServerType.MANAGER); + Wait.waitFor(() -> control.getProcesses(ServerType.MANAGER).size() == 1); + client.instanceOperations().waitForBalance(); + + // Compact table and shutdown compactor - control.startCoordinator(CompactionCoordinator.class); - getCluster().getConfig().setNumCompactors(1); - control.startCompactors(Compactor.class, 1, GROUP_NAME); - Wait.waitFor(() -> client.instanceOperations().getCompactors().size() == 1); - final Set<String> compactors = client.instanceOperations().getCompactors(); - final HostAndPort compactorAddress = HostAndPort.fromString(compactors.iterator().next()); ++ getCluster().getConfig().getClusterServerConfiguration().addCompactorResourceGroup(GROUP_NAME, ++ 1); ++ getCluster().getClusterControl().start(ServerType.COMPACTOR); ++ ++ Wait.waitFor(() -> getCluster().getServerContext().getServerPaths() ++ .getCompactor((rg) -> rg.equals(GROUP_NAME), AddressSelector.all(), true).size() == 1); ++ final Set<ServiceLockPath> compactors = getCluster().getServerContext().getServerPaths() ++ .getCompactor((rg) -> rg.equals(GROUP_NAME), AddressSelector.all(), true); ++ final HostAndPort compactorAddress = ++ HostAndPort.fromString(compactors.iterator().next().getServer()); + + final CompactionConfig cc = new CompactionConfig(); + final IteratorSetting is = new IteratorSetting(100, SlowIterator.class); + SlowIterator.setSeekSleepTime(is, 1000); + SlowIterator.setSleepTime(is, 1000); + cc.setIterators(List.of(is)); + cc.setWait(false); + + final long numFiles2 = getNumFilesForTable(ctx, tid); + assertEquals(numFiles2, numFiles); - assertEquals(0, ExternalCompactionTestUtils.getRunningCompactions(ctx).getCompactionsSize()); ++ Set<ServerId> newManagerLocations = ++ client.instanceOperations().getServers(ServerId.Type.MANAGER); ++ assertNotNull(newManagerLocations); ++ assertEquals(1, newManagerLocations.size()); ++ final HostAndPort newManagerAddress = ++ HostAndPort.fromString(newManagerLocations.iterator().next().toHostPortString()); ++ assertEquals(0, ExternalCompactionTestUtils ++ .getRunningCompactions(ctx, Optional.of(newManagerAddress)).getCompactionsSize()); + client.tableOperations().compact(tableName, cc); - Wait.waitFor( - () -> ExternalCompactionTestUtils.getRunningCompactions(ctx).getCompactionsSize() > 0); ++ Wait.waitFor(() -> ExternalCompactionTestUtils ++ .getRunningCompactions(ctx, Optional.of(newManagerAddress)).getCompactionsSize() > 0); + Admin.signalGracefulShutdown(ctx, compactorAddress.toString()); + Wait.waitFor(() -> { + control.refreshProcesses(ServerType.COMPACTOR); + return control.getProcesses(ServerType.COMPACTOR).isEmpty(); + }); + final long numFiles3 = getNumFilesForTable(ctx, tid); + assertTrue(numFiles3 < numFiles2); + assertEquals(1, numFiles3); + - getCluster().getConfig().setNumScanServers(1); - control.startScanServer(ScanServer.class, 1, GROUP_NAME); - Wait.waitFor(() -> client.instanceOperations().getScanServers().size() == 1); - final Set<String> sservers = client.instanceOperations().getScanServers(); - final HostAndPort sserver = HostAndPort.fromString(sservers.iterator().next()); ++ getCluster().getConfig().getClusterServerConfiguration() ++ .addScanServerResourceGroup(GROUP_NAME, 1); ++ getCluster().getClusterControl().start(ServerType.SCAN_SERVER); ++ ++ Wait.waitFor(() -> getCluster().getServerContext().getServerPaths() ++ .getScanServer((rg) -> rg.equals(GROUP_NAME), AddressSelector.all(), true).size() == 1); ++ final Set<ServiceLockPath> sservers = getCluster().getServerContext().getServerPaths() ++ .getScanServer((rg) -> rg.equals(GROUP_NAME), AddressSelector.all(), true); ++ final HostAndPort sserver = HostAndPort.fromString(sservers.iterator().next().getServer()); + try (final Scanner scanner = client.createScanner(tableName, Authorizations.EMPTY)) { + scanner.setRange(new Range()); + scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL); + scanner.setExecutionHints(Map.of("scan_type", "graceful")); + scanner.addScanIterator(is); // add the slow iterator + scanner.setBatchSize(1); + int count = 0; + for (Entry<Key,Value> e : scanner) { + count++; + if (count == 2) { + Admin.signalGracefulShutdown(ctx, sserver.toString()); + } + } + assertEquals(10, count); + Wait.waitFor(() -> { + control.refreshProcesses(ServerType.SCAN_SERVER); + return control.getProcesses(ServerType.SCAN_SERVER).isEmpty(); + }); + + } + + } + + } + + long getNumFilesForTable(ServerContext ctx, TableId tid) { + try (TabletsMetadata tablets = ctx.getAmple().readTablets().forTable(tid).build()) { + return tablets.stream().mapToLong(tm -> tm.getFiles().size()).sum(); + } + } + }