This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 5b9910f5d43f52e4df2badbab020bb1ea6afbd75 Merge: 3787f1b49d 96c4582794 Author: Dave Marion <dlmar...@apache.org> AuthorDate: Sat Dec 7 17:33:13 2024 +0000 Merge branch '3.1' .../java/org/apache/accumulo/core/Constants.java | 2 - .../org/apache/accumulo/core/conf/Property.java | 5 + .../org/apache/accumulo/core/lock/ServiceLock.java | 23 ++ .../apache/accumulo/core/logging/TabletLogger.java | 13 + .../core/metadata/schema/MetadataSchema.java | 16 - .../org/apache/accumulo/server/AbstractServer.java | 70 ++++- .../accumulo/server/compaction/FileCompactor.java | 12 +- .../org/apache/accumulo/server/fs/FileManager.java | 15 +- .../accumulo/server/init/ZooKeeperInitializer.java | 2 - .../accumulo/server/problems/ProblemReport.java | 252 ---------------- .../server/problems/ProblemReportingIterator.java | 24 +- .../accumulo/server/problems/ProblemReports.java | 323 --------------------- .../accumulo/server/problems/ProblemType.java | 23 -- .../server/problems/ProblemReportTest.java | 205 ------------- .../problems/ProblemReportingIteratorTest.java | 2 +- .../org/apache/accumulo/compactor/Compactor.java | 4 + .../apache/accumulo/gc/SimpleGarbageCollector.java | 7 +- .../java/org/apache/accumulo/manager/Manager.java | 6 + .../accumulo/manager/tableOps/delete/CleanUp.java | 8 - .../accumulo/manager/upgrade/Upgrader11to12.java | 147 ++++++++++ .../manager/upgrade/Upgrader11to12Test.java | 2 + .../java/org/apache/accumulo/monitor/Monitor.java | 26 +- .../monitor/rest/problems/ProblemDetail.java | 42 --- .../rest/problems/ProblemDetailInformation.java | 69 ----- .../monitor/rest/problems/ProblemSummary.java | 42 --- .../rest/problems/ProblemSummaryInformation.java | 62 ---- .../monitor/rest/problems/ProblemsResource.java | 185 ------------ .../monitor/rest/status/StatusInformation.java | 6 +- .../monitor/rest/status/StatusResource.java | 3 +- .../accumulo/monitor/resources/js/functions.js | 40 +-- .../apache/accumulo/monitor/resources/js/navbar.js | 34 --- .../accumulo/monitor/resources/js/problems.js | 167 ----------- .../apache/accumulo/monitor/templates/navbar.ftl | 8 - .../apache/accumulo/monitor/templates/problems.ftl | 71 ----- .../apache/accumulo/tserver/AssignmentHandler.java | 19 +- .../org/apache/accumulo/tserver/TabletServer.java | 1 + .../accumulo/tserver/tablet/MinorCompactor.java | 15 - .../org/apache/accumulo/tserver/tablet/Tablet.java | 5 - .../test/functional/HalfDeadServerWatcherIT.java | 225 ++++++++++++++ .../apache/accumulo/test/lock/ServiceLockIT.java | 24 +- 40 files changed, 550 insertions(+), 1655 deletions(-) diff --cc core/src/main/java/org/apache/accumulo/core/Constants.java index 28a9d8ea36,e3480686d7..6270fcb6c1 --- a/core/src/main/java/org/apache/accumulo/core/Constants.java +++ b/core/src/main/java/org/apache/accumulo/core/Constants.java @@@ -74,8 -75,8 +74,6 @@@ public class Constants public static final String ZDEAD = "/dead"; public static final String ZDEADTSERVERS = ZDEAD + "/tservers"; - public static final String ZPROBLEMS = "/problems"; - public static final String BULK_ARBITRATOR_TYPE = "bulkTx"; -- public static final String ZFATE = "/fate"; public static final String ZNEXT_FILE = "/next_file"; diff --cc core/src/main/java/org/apache/accumulo/core/logging/TabletLogger.java index 40dc0b5eb1,0b6849216a..c9716b561e --- a/core/src/main/java/org/apache/accumulo/core/logging/TabletLogger.java +++ b/core/src/main/java/org/apache/accumulo/core/logging/TabletLogger.java @@@ -23,14 -23,13 +23,15 @@@ import static java.util.stream.Collecto import java.util.Collection; import java.util.List; import java.util.Optional; +import java.util.SortedSet; import java.util.UUID; -import org.apache.accumulo.core.client.admin.CompactionConfig; import org.apache.accumulo.core.client.admin.compaction.CompactableFile; + import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.metadata.CompactableFileImpl; +import org.apache.accumulo.core.metadata.ReferencedTabletFile; import org.apache.accumulo.core.metadata.StoredTabletFile; import org.apache.accumulo.core.metadata.TServerInstance; import org.apache.accumulo.core.metadata.TabletFile; diff --cc core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java index 9374364a07,b7269eb494..5360c98274 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java @@@ -529,20 -415,16 +529,4 @@@ public class MetadataSchema } - /** - * Holds error message processing flags - */ - public static class ProblemSection { - public static class ExternalCompactionSection { -- private static final Section section = - new Section(RESERVED_PREFIX + "err_", true, RESERVED_PREFIX + "err`", false); - new Section(RESERVED_PREFIX + "ecomp", true, RESERVED_PREFIX + "ecomq", false); -- -- public static Range getRange() { -- return section.getRange(); -- } -- -- public static String getRowPrefix() { -- return section.getRowPrefix(); -- } - -- } } diff --cc server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java index ba621f6b01,3cc20c1e22..1dac3b0a60 --- a/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java +++ b/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java @@@ -20,20 -20,22 +20,24 @@@ package org.apache.accumulo.server import static java.util.concurrent.TimeUnit.MILLISECONDS; + import java.util.OptionalInt; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.classloader.ClassLoaderUtil; import org.apache.accumulo.core.cli.ConfigOpts; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.conf.SiteConfiguration; + import org.apache.accumulo.core.lock.ServiceLock; import org.apache.accumulo.core.metrics.MetricsProducer; import org.apache.accumulo.core.trace.TraceUtil; + import org.apache.accumulo.core.util.Halt; import org.apache.accumulo.core.util.Timer; import org.apache.accumulo.core.util.threads.ThreadPools; + import org.apache.accumulo.core.util.threads.Threads; import org.apache.accumulo.server.mem.LowMemoryDetector; import org.apache.accumulo.server.metrics.ProcessMetrics; import org.apache.accumulo.server.security.SecurityUtil; @@@ -47,21 -51,21 +53,24 @@@ public abstract class AbstractServer im private final ServerContext context; protected final String applicationName; private final String hostname; + private final String resourceGroup; + private final Logger log; private final ProcessMetrics processMetrics; protected final long idleReportingPeriodMillis; private volatile Timer idlePeriodTimer = null; + private volatile Thread serverThread; + private volatile Thread verificationThread; - protected AbstractServer(String appName, ConfigOpts opts, String[] args) { + protected AbstractServer(String appName, ConfigOpts opts, + Function<SiteConfiguration,ServerContext> serverContextFactory, String[] args) { this.applicationName = appName; opts.parseArgs(appName, args); var siteConfig = opts.getSiteConfiguration(); this.hostname = siteConfig.get(Property.GENERAL_PROCESS_BIND_ADDRESS); + this.resourceGroup = getResourceGroupPropertyValue(siteConfig); SecurityUtil.serverLogin(siteConfig); - context = new ServerContext(siteConfig); + context = serverContextFactory.apply(siteConfig); - Logger log = LoggerFactory.getLogger(getClass()); + log = LoggerFactory.getLogger(getClass()); log.info("Version " + Constants.VERSION); log.info("Instance " + context.getInstanceID()); context.init(appName); diff --cc server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index 3eabcedbf6,b9cded93fa..ca19dec098 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@@ -1018,4 -978,9 +1018,8 @@@ public class Compactor extends Abstract } } + @Override + public ServiceLock getLock() { + return compactorLock; + } - } diff --cc server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java index 4ba44a5fef,62c2280b84..65f80b1864 --- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java @@@ -89,12 -77,10 +89,12 @@@ public class SimpleGarbageCollector ext new GCStatus(new GcCycleStats(), new GcCycleStats(), new GcCycleStats(), new GcCycleStats()); private final GcCycleMetrics gcCycleMetrics = new GcCycleMetrics(); - private ServiceLock gcLock; + + private final Timer lastCompactorCheck = Timer.startNew(); + SimpleGarbageCollector(ConfigOpts opts, String[] args) { - super("gc", opts, args); + super("gc", opts, ServerContext::new, args); final AccumuloConfiguration conf = getConfiguration(); diff --cc server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index 1593596ccf,dfb672013f..062e306202 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@@ -1858,28 -1934,9 +1859,33 @@@ public class Manager extends AbstractSe tabletBalancer.getAssignments(params); } + public TabletStateStore getTabletStateStore(DataLevel level) { + switch (level) { + case METADATA: + return this.metadataTabletStore; + case ROOT: + return this.rootTabletStore; + case USER: + return this.userTabletStore; + default: + throw new IllegalStateException("Unhandled DataLevel value: " + level); + } + } + + @Override + public void registerMetrics(MeterRegistry registry) { + super.registerMetrics(registry); + compactionCoordinator.registerMetrics(registry); + } + + private Map<FateInstanceType,Fate<Manager>> getFateRefs() { + var fateRefs = this.fateRefs.get(); + Preconditions.checkState(fateRefs != null, "Unexpected null fate references map"); + return fateRefs; + } ++ + @Override + public ServiceLock getLock() { + return managerLock; + } - } diff --cc server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/CleanUp.java index 142fe479d5,c45fa1e54e..b8c58164d4 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/CleanUp.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/CleanUp.java @@@ -44,7 -47,7 +44,6 @@@ import org.apache.accumulo.manager.Mana import org.apache.accumulo.manager.tableOps.ManagerRepo; import org.apache.accumulo.manager.tableOps.Utils; import org.apache.accumulo.server.fs.VolumeManager; - import org.apache.accumulo.server.problems.ProblemReports; -import org.apache.accumulo.server.manager.state.MetaDataTableScanner; import org.apache.accumulo.server.util.MetadataTableUtil; import org.apache.hadoop.fs.Path; import org.slf4j.Logger; diff --cc server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java index 12aa0989d4,239394f6ab..1ac2914df8 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java @@@ -82,10 -80,7 +81,8 @@@ import org.apache.accumulo.monitor.rest import org.apache.accumulo.server.AbstractServer; import org.apache.accumulo.server.HighlyAvailableService; import org.apache.accumulo.server.ServerContext; - import org.apache.accumulo.server.problems.ProblemReports; - import org.apache.accumulo.server.problems.ProblemType; import org.apache.accumulo.server.util.TableInfoUtil; +import org.apache.thrift.transport.TTransportException; import org.apache.zookeeper.KeeperException; import org.eclipse.jetty.servlet.DefaultServlet; import org.eclipse.jetty.servlet.ServletHolder; @@@ -144,12 -139,14 +141,10 @@@ public class Monitor extends AbstractSe private final AtomicBoolean fetching = new AtomicBoolean(false); private ManagerMonitorInfo mmi; - private Map<TableId,Map<ProblemType,Integer>> problemSummary = Collections.emptyMap(); - private Exception problemException; private GCStatus gcStatus; - private Optional<HostAndPort> coordinatorHost = Optional.empty(); - private long coordinatorCheckNanos = 0L; - private CompactionCoordinatorService.Client coordinatorClient; + private volatile Optional<HostAndPort> coordinatorHost = Optional.empty(); private final String coordinatorMissingMsg = - "Error getting the compaction coordinator. Check that it is running. It is not " - + "started automatically with other cluster processes so must be started by running " - + "'accumulo compaction-coordinator'."; + "Error getting the compaction coordinator client. Check that the Manager is running."; private EmbeddedWebServer server; private int livePort = 0; @@@ -312,16 -294,23 +307,8 @@@ this.totalLookups = totalLookups; } - try { - this.problemSummary = ProblemReports.getInstance(context).summarize(); - this.problemException = null; - } catch (Exception e) { - log.info("Failed to obtain problem reports ", e); - this.problemSummary = Collections.emptyMap(); - this.problemException = e; - } - // check for compaction coordinator host and only notify its discovery - Optional<HostAndPort> previousHost; - if (System.nanoTime() - coordinatorCheckNanos > fetchTimeNanos) { - previousHost = coordinatorHost; - coordinatorHost = ExternalCompactionUtil.findCompactionCoordinator(context); - coordinatorCheckNanos = System.nanoTime(); - if (previousHost.isEmpty() && coordinatorHost.isPresent()) { - log.info("External Compaction Coordinator found at {}", coordinatorHost.orElseThrow()); - } - } - } finally { - if (coordinatorClient != null) { - ThriftUtil.returnClient(coordinatorClient, context); - coordinatorClient = null; - } lastRecalc.set(currentTime); // stop fetching; log an error if this thread wasn't already fetching if (!fetching.compareAndSet(true, false)) { diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/AssignmentHandler.java index ec56611a03,1d6f1fe244..5d55a481fa --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/AssignmentHandler.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/AssignmentHandler.java @@@ -19,9 -19,10 +19,8 @@@ package org.apache.accumulo.tserver; import static java.util.concurrent.TimeUnit.MINUTES; - import static org.apache.accumulo.server.problems.ProblemType.TABLET_LOAD; -import java.util.Arrays; import java.util.Set; -import java.util.TreeSet; import java.util.concurrent.TimeUnit; import org.apache.accumulo.core.client.AccumuloException; @@@ -36,13 -37,12 +35,10 @@@ import org.apache.accumulo.core.util.th import org.apache.accumulo.core.util.threads.Threads; import org.apache.accumulo.server.manager.state.Assignment; import org.apache.accumulo.server.manager.state.TabletStateStore; - import org.apache.accumulo.server.problems.ProblemReport; - import org.apache.accumulo.server.problems.ProblemReports; -import org.apache.accumulo.server.util.ManagerMetadataUtil; import org.apache.accumulo.tserver.TabletServerResourceManager.TabletResourceManager; import org.apache.accumulo.tserver.managermessage.TabletStatusMessage; import org.apache.accumulo.tserver.tablet.Tablet; -import org.apache.accumulo.tserver.tablet.TabletData; -import org.apache.hadoop.io.Text; +import org.apache.accumulo.tserver.tablet.Tablet.RefreshPurpose; - import org.apache.hadoop.io.Text; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@@ -100,7 -100,7 +96,6 @@@ class AssignmentHandler implements Runn } // check Metadata table before accepting assignment -- Text locationToOpen = null; TabletMetadata tabletMetadata = null; boolean canLoad = false; try { diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java index a022910cd7,7482e75a6e..d69dd1b74b --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java @@@ -84,15 -92,19 +84,12 @@@ import org.apache.accumulo.core.tablets import org.apache.accumulo.core.tabletserver.thrift.TabletStats; import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.core.util.Pair; -import org.apache.accumulo.core.volume.Volume; -import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.server.compaction.CompactionStats; -import org.apache.accumulo.server.compaction.PausedCompactionMetrics; -import org.apache.accumulo.server.fs.VolumeChooserEnvironmentImpl; -import org.apache.accumulo.server.fs.VolumeUtil; -import org.apache.accumulo.server.fs.VolumeUtil.TabletFiles; +import org.apache.accumulo.server.fs.VolumeManager; - import org.apache.accumulo.server.problems.ProblemReport; - import org.apache.accumulo.server.problems.ProblemReports; - import org.apache.accumulo.server.problems.ProblemType; +import org.apache.accumulo.server.tablets.ConditionCheckerContext.ConditionChecker; +import org.apache.accumulo.server.tablets.TabletNameGenerator; import org.apache.accumulo.server.tablets.TabletTime; -import org.apache.accumulo.server.tablets.UniqueNameAllocator; -import org.apache.accumulo.server.util.FileUtil; -import org.apache.accumulo.server.util.ManagerMetadataUtil; -import org.apache.accumulo.server.util.MetadataTableUtil; -import org.apache.accumulo.tserver.ConditionCheckerContext.ConditionChecker; import org.apache.accumulo.tserver.InMemoryMap; import org.apache.accumulo.tserver.MinorCompactionReason; import org.apache.accumulo.tserver.TabletServer; diff --cc test/src/main/java/org/apache/accumulo/test/functional/HalfDeadServerWatcherIT.java index 0000000000,83dd3c83a4..3f791f411b mode 000000,100644..100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/HalfDeadServerWatcherIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/HalfDeadServerWatcherIT.java @@@ -1,0 -1,216 +1,225 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.accumulo.test.functional; + + import static org.junit.jupiter.api.Assertions.assertEquals; + import static org.junit.jupiter.api.Assertions.assertThrows; + import static org.junit.jupiter.api.Assertions.assertTrue; + import static org.junit.jupiter.api.Assertions.fail; + -import java.io.IOException; -import java.util.List; -import java.util.TreeMap; -import java.util.TreeSet; ++import java.util.Set; + import java.util.concurrent.atomic.AtomicBoolean; ++import java.util.function.Function; + + import org.apache.accumulo.core.Constants; + import org.apache.accumulo.core.cli.ConfigOpts; + import org.apache.accumulo.core.client.Accumulo; + import org.apache.accumulo.core.client.AccumuloClient; + import org.apache.accumulo.core.client.AccumuloException; ++import org.apache.accumulo.core.client.admin.NewTableConfiguration; ++import org.apache.accumulo.core.client.admin.TabletAvailability; ++import org.apache.accumulo.core.client.admin.servers.ServerId; + import org.apache.accumulo.core.conf.Property; ++import org.apache.accumulo.core.conf.SiteConfiguration; + import org.apache.accumulo.core.data.TableId; -import org.apache.accumulo.core.dataImpl.KeyExtent; + import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy; ++import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector; ++import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath; + import org.apache.accumulo.core.util.UtilWaitThread; + import org.apache.accumulo.harness.AccumuloClusterHarness; + import org.apache.accumulo.minicluster.ServerType; + import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; + import org.apache.accumulo.server.ServerContext; + import org.apache.accumulo.test.util.Wait; + import org.apache.accumulo.tserver.TabletServer; -import org.apache.accumulo.tserver.tablet.Tablet; -import org.apache.accumulo.tserver.tablet.TabletData; + import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.Text; + import org.apache.zookeeper.KeeperException; + import org.apache.zookeeper.WatchedEvent; + import org.apache.zookeeper.Watcher; + import org.junit.jupiter.api.AfterEach; + import org.junit.jupiter.api.Test; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + ++import com.google.common.net.HostAndPort; ++ + /** + * Test that validates that the TabletServer will be terminated when the lock is removed in + * ZooKeeper, but a Watcher in the TabletServer is preventing the LockWatcher to be invoked. + */ + public class HalfDeadServerWatcherIT extends AccumuloClusterHarness { + + public static class HalfDeadTabletServer extends TabletServer { + + private static final Logger LOG = LoggerFactory.getLogger(HalfDeadTabletServer.class); + + public static void main(String[] args) throws Exception { - try (HalfDeadTabletServer tserver = new HalfDeadTabletServer(new ConfigOpts(), args)) { ++ try (HalfDeadTabletServer tserver = ++ new HalfDeadTabletServer(new ConfigOpts(), ServerContext::new, args)) { + tserver.runServer(); + } + } + + public static class StuckWatcher implements Watcher { + private static final Logger LOG = LoggerFactory.getLogger(StuckWatcher.class); + + @Override + public void process(WatchedEvent event) { + LOG.info("started sleeping..."); + while (true) { + LOG.info("still sleeping..."); + UtilWaitThread.sleep(2000); + } + } + + } + - protected HalfDeadTabletServer(ConfigOpts opts, String[] args) { - super(opts, args); ++ protected HalfDeadTabletServer(ConfigOpts opts, ++ Function<SiteConfiguration,ServerContext> serverContextFactory, String[] args) { ++ super(opts, serverContextFactory, args); + } + + @Override - protected TreeMap<KeyExtent,TabletData> splitTablet(Tablet tablet, byte[] splitPoint) - throws IOException { - LOG.info("In HalfDeadServerWatcherIT::splitTablet"); - TreeMap<KeyExtent,TabletData> results = super.splitTablet(tablet, splitPoint); - if (!tablet.getExtent().isMeta()) { - final TableId tid = tablet.getExtent().tableId(); - final String zooRoot = this.getContext().getZooKeeperRoot(); - final String tableZPath = zooRoot + Constants.ZTABLES + "/" + tid.canonical(); - try { - this.getContext().getZooReaderWriter().exists(tableZPath, new StuckWatcher()); - } catch (KeeperException | InterruptedException e) { - LOG.error("Error setting watch at: {}", tableZPath, e); ++ public void evaluateOnDemandTabletsForUnload() { ++ super.evaluateOnDemandTabletsForUnload(); ++ getOnlineTablets().keySet().forEach(ke -> { ++ if (!ke.isMeta()) { ++ final TableId tid = ke.tableId(); ++ final String zooRoot = this.getContext().getZooKeeperRoot(); ++ final String tableZPath = zooRoot + Constants.ZTABLES + "/" + tid.canonical(); ++ try { ++ this.getContext().getZooReaderWriter().exists(tableZPath, new StuckWatcher()); ++ } catch (KeeperException | InterruptedException e) { ++ LOG.error("Error setting watch at: {}", tableZPath, e); ++ } ++ LOG.info("Set StuckWatcher at: {}", tableZPath); + } - LOG.info("Set StuckWatcher at: {}", tableZPath); - } - return results; ++ }); + } + } + + private static final AtomicBoolean USE_VERIFICATION_THREAD = new AtomicBoolean(false); + + @Override + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + if (USE_VERIFICATION_THREAD.get()) { + cfg.setProperty(Property.GENERAL_SERVER_LOCK_VERIFICATION_INTERVAL, "10s"); + } else { + cfg.setProperty(Property.GENERAL_SERVER_LOCK_VERIFICATION_INTERVAL, "0"); + } + cfg.setServerClass(ServerType.TABLET_SERVER, HalfDeadTabletServer.class); - cfg.setNumCompactors(0); - cfg.setNumScanServers(0); - cfg.setNumTservers(1); ++ cfg.setProperty(Property.TSERV_ONDEMAND_UNLOADER_INTERVAL, "30s"); ++ cfg.getClusterServerConfiguration().setNumDefaultCompactors(1); ++ cfg.getClusterServerConfiguration().setNumDefaultScanServers(0); ++ cfg.getClusterServerConfiguration().setNumDefaultTabletServers(1); + } + + @AfterEach + public void afterTest() throws Exception { + getCluster().getClusterControl().stopAllServers(ServerType.TABLET_SERVER); + super.teardownCluster(); + USE_VERIFICATION_THREAD.set(!USE_VERIFICATION_THREAD.get()); + } + + @Test + public void testOne() throws Exception { + if (USE_VERIFICATION_THREAD.get()) { + // This test should use the verification thread, which should + // end the TabletServer, throw an Exception on the ping call, + // and return true + assertTrue(testTabletServerWithStuckWatcherDies()); + } else { + // This test should time out + IllegalStateException e = + assertThrows(IllegalStateException.class, () -> testTabletServerWithStuckWatcherDies()); + assertTrue(e.getMessage().contains("Timeout exceeded")); + } + } + + @Test + public void testTwo() throws Exception { + if (USE_VERIFICATION_THREAD.get()) { + // This test should use the verification thread, which should + // end the TabletServer, throw an Exception on the ping call, + // and return true + assertTrue(testTabletServerWithStuckWatcherDies()); + } else { + // This test should time out + IllegalStateException e = + assertThrows(IllegalStateException.class, () -> testTabletServerWithStuckWatcherDies()); + assertTrue(e.getMessage().contains("Timeout exceeded")); + } + } + + public boolean testTabletServerWithStuckWatcherDies() throws Exception { + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + String tableName = getUniqueNames(1)[0]; - client.tableOperations().create(tableName); + - // add splits to the table, which should set a StuckWatcher on the table node in zookeeper - TreeSet<Text> splits = new TreeSet<>(); - splits.add(new Text("j")); - splits.add(new Text("t")); - client.tableOperations().addSplits(tableName, splits); ++ client.tableOperations().create(tableName, ++ new NewTableConfiguration().withInitialTabletAvailability(TabletAvailability.HOSTED)); ++ ++ // Wait a minute, the evaluator thread runs on a 30s interval to set the StuckWatcher ++ Thread.sleep(60_000); + + // delete the table, which should invoke the watcher + client.tableOperations().delete(tableName); + - final List<String> tservers = client.instanceOperations().getTabletServers(); ++ final Set<ServerId> tservers = ++ client.instanceOperations().getServers(ServerId.Type.TABLET_SERVER); + assertEquals(1, tservers.size()); + ++ ServerId tserver = tservers.iterator().next(); ++ + // Delete the lock for the TabletServer + final ServerContext ctx = getServerContext(); - final String zooRoot = ctx.getZooKeeperRoot(); - ctx.getZooReaderWriter().recursiveDelete( - zooRoot + Constants.ZTSERVERS + "/" + tservers.get(0), NodeMissingPolicy.FAIL); - - Wait.waitFor(() -> pingServer(client, tservers.get(0)) == false, 60_000); ++ Set<ServiceLockPath> serverPaths = ctx.getServerPaths().getTabletServer( ++ (rg) -> rg.equals(Constants.DEFAULT_RESOURCE_GROUP_NAME), ++ AddressSelector.exact(HostAndPort.fromString(tserver.toHostPortString())), true); ++ assertEquals(1, serverPaths.size()); ++ ctx.getZooReaderWriter().recursiveDelete(serverPaths.iterator().next().toString(), ++ NodeMissingPolicy.FAIL); ++ ++ Wait.waitFor(() -> pingServer(client, tserver.toHostPortString()) == false, 60_000); + return true; + } + + } + + private boolean pingServer(AccumuloClient client, String server) { + final boolean lockVerificationThreadInUse = USE_VERIFICATION_THREAD.get(); + try { + client.instanceOperations().ping(server); + return true; + } catch (AccumuloException e) { + if (lockVerificationThreadInUse) { + // If the lock verification thread is in use, the the TabletServer + // should shut down and the call to ping will throw an Exception + return false; + } else { + // With the lock verification thread disabled, the StuckWatcher + // should prevent the TabletServer from shutting down during + // this test method. + fail("TabletServer unexpectedly shut down"); + return false; + } + } + + } + + }