This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch 3.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 96c45827942514b4dcf1aa874eb301d0dc6064c3 Merge: 867d0eb6a6 be9fa22956 Author: Dave Marion <dlmar...@apache.org> AuthorDate: Sat Dec 7 15:52:22 2024 +0000 Merge branch '2.1' into 3.1 .../org/apache/accumulo/core/conf/Property.java | 5 + .../org/apache/accumulo/core/lock/ServiceLock.java | 23 +++ .../org/apache/accumulo/server/AbstractServer.java | 70 ++++++- .../coordinator/CompactionCoordinator.java | 5 + .../org/apache/accumulo/compactor/Compactor.java | 6 + .../apache/accumulo/gc/SimpleGarbageCollector.java | 11 +- .../java/org/apache/accumulo/manager/Manager.java | 7 + .../java/org/apache/accumulo/monitor/Monitor.java | 5 + .../org/apache/accumulo/tserver/TabletServer.java | 4 +- .../test/functional/HalfDeadServerWatcherIT.java | 216 +++++++++++++++++++++ .../apache/accumulo/test/lock/ServiceLockIT.java | 24 ++- 11 files changed, 366 insertions(+), 10 deletions(-) diff --cc core/src/main/java/org/apache/accumulo/core/conf/Property.java index 32d08169bc,b1a658ea2f..6b70bd059d --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@@ -341,12 -329,24 +341,17 @@@ public enum Property + " was changed and it now can accept multiple class names. The metrics spi was introduced in 2.1.3," + " the deprecated factory is org.apache.accumulo.core.metrics.MeterRegistryFactory.", "2.1.0"), - GENERAL_SERVER_LOCK_VERIFICATION_INTERVAL("general.server.lock.verification.interval", "0", + GENERAL_PROCESS_BIND_ADDRESS("general.process.bind.addr", "0.0.0.0", PropertyType.STRING, + "The local IP address to which this server should bind for sending and receiving network traffic.", + "3.0.0"), ++ GENERAL_SERVER_LOCK_VERIFICATION_INTERVAL("general.server.lock.verification.interval", "2m", + PropertyType.TIMEDURATION, + "Interval at which the Manager and TabletServer should verify their server locks. A value of zero" - + " disables this check.", ++ + " disables this check. The default value change from 0 to 2m in 3.1.0.", + "2.1.4"), // properties that are specific to manager server behavior MANAGER_PREFIX("manager.", null, PropertyType.PREFIX, - "Properties in this category affect the behavior of the manager server. " - + "Since 2.1.0, all properties in this category replace the old `master.*` names.", - "2.1.0"), - @Deprecated(since = "2.1.0") - @ReplacedBy(property = Property.MANAGER_PREFIX) - MASTER_PREFIX("master.", null, PropertyType.PREFIX, - "Properties in this category affect the behavior of the manager (formerly named master) server. " - + "Since 2.1.0, all properties in this category are deprecated and replaced with corresponding " - + "`manager.*` properties. The old `master.*` names can still be used until at release 3.0, but a warning " - + "will be emitted. Configuration files should be updated to use the new property names.", - "1.3.5"), + "Properties in this category affect the behavior of the manager server.", "2.1.0"), MANAGER_CLIENTPORT("manager.port.client", "9999", PropertyType.PORT, "The port used for handling client connections on the manager.", "1.3.5"), MANAGER_TABLET_BALANCER("manager.tablet.balancer", diff --cc server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java index 47fe34bf26,c65314a0f1..3cc20c1e22 --- a/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java +++ b/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java @@@ -18,21 -18,20 +18,25 @@@ */ package org.apache.accumulo.server; -import java.util.Objects; +import static java.util.concurrent.TimeUnit.MILLISECONDS; + + import java.util.OptionalInt; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicReference; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.classloader.ClassLoaderUtil; +import org.apache.accumulo.core.cli.ConfigOpts; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.fate.zookeeper.ServiceLock; ++import org.apache.accumulo.core.lock.ServiceLock; import org.apache.accumulo.core.metrics.MetricsProducer; import org.apache.accumulo.core.trace.TraceUtil; + import org.apache.accumulo.core.util.Halt; +import org.apache.accumulo.core.util.Timer; +import org.apache.accumulo.core.util.threads.ThreadPools; + import org.apache.accumulo.core.util.threads.Threads; +import org.apache.accumulo.server.mem.LowMemoryDetector; import org.apache.accumulo.server.metrics.ProcessMetrics; import org.apache.accumulo.server.security.SecurityUtil; import org.slf4j.Logger; @@@ -45,18 -46,21 +51,21 @@@ public abstract class AbstractServer im private final ServerContext context; protected final String applicationName; private final String hostname; + private final Logger log; private final ProcessMetrics processMetrics; - protected final long idleReportingPeriodNanos; - private volatile long idlePeriodStartNanos = 0L; + protected final long idleReportingPeriodMillis; + private volatile Timer idlePeriodTimer = null; + private volatile Thread serverThread; + private volatile Thread verificationThread; - protected AbstractServer(String appName, ServerOpts opts, String[] args) { - this.log = LoggerFactory.getLogger(getClass().getName()); + protected AbstractServer(String appName, ConfigOpts opts, String[] args) { this.applicationName = appName; opts.parseArgs(appName, args); - this.hostname = Objects.requireNonNull(opts.getAddress()); var siteConfig = opts.getSiteConfiguration(); + this.hostname = siteConfig.get(Property.GENERAL_PROCESS_BIND_ADDRESS); SecurityUtil.serverLogin(siteConfig); context = new ServerContext(siteConfig); - Logger log = LoggerFactory.getLogger(getClass()); ++ log = LoggerFactory.getLogger(getClass()); log.info("Version " + Constants.VERSION); log.info("Instance " + context.getInstanceID()); context.init(appName); diff --cc server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java index c128c19fea,b735d8544d..fd4e27fc65 --- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java @@@ -763,8 -768,13 +763,13 @@@ public class CompactionCoordinator exte } } + @Override + public ServiceLock getLock() { + return coordinatorLock; + } + public static void main(String[] args) throws Exception { - try (CompactionCoordinator compactor = new CompactionCoordinator(new ServerOpts(), args)) { + try (CompactionCoordinator compactor = new CompactionCoordinator(new ConfigOpts(), args)) { compactor.runServer(); } } diff --cc server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java index 077956b9bd,bd78388836..62c2280b84 --- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java @@@ -77,8 -78,9 +77,9 @@@ public class SimpleGarbageCollector ext new GCStatus(new GcCycleStats(), new GcCycleStats(), new GcCycleStats(), new GcCycleStats()); private final GcCycleMetrics gcCycleMetrics = new GcCycleMetrics(); + private ServiceLock gcLock; - SimpleGarbageCollector(ServerOpts opts, String[] args) { + SimpleGarbageCollector(ConfigOpts opts, String[] args) { super("gc", opts, args); final AccumuloConfiguration conf = getConfiguration(); @@@ -350,11 -380,10 +351,10 @@@ }; UUID zooLockUUID = UUID.randomUUID(); - ServiceLock lock = - new ServiceLock(getContext().getZooReaderWriter().getZooKeeper(), path, zooLockUUID); + gcLock = new ServiceLock(getContext().getZooReaderWriter().getZooKeeper(), path, zooLockUUID); while (true) { - if (lock.tryLock(lockWatcher, + if (gcLock.tryLock(lockWatcher, - new ServerServices(addr.toString(), Service.GC_CLIENT).toString().getBytes(UTF_8))) { + new ServiceLockData(zooLockUUID, addr.toString(), ThriftService.GC))) { log.debug("Got GC ZooKeeper lock"); return; } diff --cc test/src/main/java/org/apache/accumulo/test/functional/HalfDeadServerWatcherIT.java index 0000000000,b0b8162348..83dd3c83a4 mode 000000,100644..100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/HalfDeadServerWatcherIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/HalfDeadServerWatcherIT.java @@@ -1,0 -1,216 +1,216 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.accumulo.test.functional; + + import static org.junit.jupiter.api.Assertions.assertEquals; + import static org.junit.jupiter.api.Assertions.assertThrows; + import static org.junit.jupiter.api.Assertions.assertTrue; + import static org.junit.jupiter.api.Assertions.fail; + + import java.io.IOException; + import java.util.List; + import java.util.TreeMap; + import java.util.TreeSet; + import java.util.concurrent.atomic.AtomicBoolean; + + import org.apache.accumulo.core.Constants; ++import org.apache.accumulo.core.cli.ConfigOpts; + import org.apache.accumulo.core.client.Accumulo; + import org.apache.accumulo.core.client.AccumuloClient; + import org.apache.accumulo.core.client.AccumuloException; + import org.apache.accumulo.core.conf.Property; + import org.apache.accumulo.core.data.TableId; + import org.apache.accumulo.core.dataImpl.KeyExtent; + import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy; + import org.apache.accumulo.core.util.UtilWaitThread; + import org.apache.accumulo.harness.AccumuloClusterHarness; + import org.apache.accumulo.minicluster.ServerType; + import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; + import org.apache.accumulo.server.ServerContext; -import org.apache.accumulo.server.ServerOpts; + import org.apache.accumulo.test.util.Wait; + import org.apache.accumulo.tserver.TabletServer; + import org.apache.accumulo.tserver.tablet.Tablet; + import org.apache.accumulo.tserver.tablet.TabletData; + import org.apache.hadoop.conf.Configuration; + import org.apache.hadoop.io.Text; + import org.apache.zookeeper.KeeperException; + import org.apache.zookeeper.WatchedEvent; + import org.apache.zookeeper.Watcher; + import org.junit.jupiter.api.AfterEach; + import org.junit.jupiter.api.Test; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + + /** + * Test that validates that the TabletServer will be terminated when the lock is removed in + * ZooKeeper, but a Watcher in the TabletServer is preventing the LockWatcher to be invoked. + */ + public class HalfDeadServerWatcherIT extends AccumuloClusterHarness { + + public static class HalfDeadTabletServer extends TabletServer { + + private static final Logger LOG = LoggerFactory.getLogger(HalfDeadTabletServer.class); + + public static void main(String[] args) throws Exception { - try (HalfDeadTabletServer tserver = new HalfDeadTabletServer(new ServerOpts(), args)) { ++ try (HalfDeadTabletServer tserver = new HalfDeadTabletServer(new ConfigOpts(), args)) { + tserver.runServer(); + } + } + + public static class StuckWatcher implements Watcher { + private static final Logger LOG = LoggerFactory.getLogger(StuckWatcher.class); + + @Override + public void process(WatchedEvent event) { + LOG.info("started sleeping..."); + while (true) { + LOG.info("still sleeping..."); + UtilWaitThread.sleep(2000); + } + } + + } + - protected HalfDeadTabletServer(ServerOpts opts, String[] args) { ++ protected HalfDeadTabletServer(ConfigOpts opts, String[] args) { + super(opts, args); + } + + @Override + protected TreeMap<KeyExtent,TabletData> splitTablet(Tablet tablet, byte[] splitPoint) + throws IOException { + LOG.info("In HalfDeadServerWatcherIT::splitTablet"); + TreeMap<KeyExtent,TabletData> results = super.splitTablet(tablet, splitPoint); + if (!tablet.getExtent().isMeta()) { + final TableId tid = tablet.getExtent().tableId(); + final String zooRoot = this.getContext().getZooKeeperRoot(); + final String tableZPath = zooRoot + Constants.ZTABLES + "/" + tid.canonical(); + try { + this.getContext().getZooReaderWriter().exists(tableZPath, new StuckWatcher()); + } catch (KeeperException | InterruptedException e) { + LOG.error("Error setting watch at: {}", tableZPath, e); + } + LOG.info("Set StuckWatcher at: {}", tableZPath); + } + return results; + } + } + + private static final AtomicBoolean USE_VERIFICATION_THREAD = new AtomicBoolean(false); + + @Override + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + if (USE_VERIFICATION_THREAD.get()) { + cfg.setProperty(Property.GENERAL_SERVER_LOCK_VERIFICATION_INTERVAL, "10s"); + } else { + cfg.setProperty(Property.GENERAL_SERVER_LOCK_VERIFICATION_INTERVAL, "0"); + } + cfg.setServerClass(ServerType.TABLET_SERVER, HalfDeadTabletServer.class); + cfg.setNumCompactors(0); + cfg.setNumScanServers(0); + cfg.setNumTservers(1); + } + + @AfterEach + public void afterTest() throws Exception { + getCluster().getClusterControl().stopAllServers(ServerType.TABLET_SERVER); + super.teardownCluster(); + USE_VERIFICATION_THREAD.set(!USE_VERIFICATION_THREAD.get()); + } + + @Test + public void testOne() throws Exception { + if (USE_VERIFICATION_THREAD.get()) { + // This test should use the verification thread, which should + // end the TabletServer, throw an Exception on the ping call, + // and return true + assertTrue(testTabletServerWithStuckWatcherDies()); + } else { + // This test should time out + IllegalStateException e = + assertThrows(IllegalStateException.class, () -> testTabletServerWithStuckWatcherDies()); + assertTrue(e.getMessage().contains("Timeout exceeded")); + } + } + + @Test + public void testTwo() throws Exception { + if (USE_VERIFICATION_THREAD.get()) { + // This test should use the verification thread, which should + // end the TabletServer, throw an Exception on the ping call, + // and return true + assertTrue(testTabletServerWithStuckWatcherDies()); + } else { + // This test should time out + IllegalStateException e = + assertThrows(IllegalStateException.class, () -> testTabletServerWithStuckWatcherDies()); + assertTrue(e.getMessage().contains("Timeout exceeded")); + } + } + + public boolean testTabletServerWithStuckWatcherDies() throws Exception { + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + String tableName = getUniqueNames(1)[0]; + client.tableOperations().create(tableName); + + // add splits to the table, which should set a StuckWatcher on the table node in zookeeper + TreeSet<Text> splits = new TreeSet<>(); + splits.add(new Text("j")); + splits.add(new Text("t")); + client.tableOperations().addSplits(tableName, splits); + + // delete the table, which should invoke the watcher + client.tableOperations().delete(tableName); + + final List<String> tservers = client.instanceOperations().getTabletServers(); + assertEquals(1, tservers.size()); + + // Delete the lock for the TabletServer + final ServerContext ctx = getServerContext(); + final String zooRoot = ctx.getZooKeeperRoot(); + ctx.getZooReaderWriter().recursiveDelete( + zooRoot + Constants.ZTSERVERS + "/" + tservers.get(0), NodeMissingPolicy.FAIL); + + Wait.waitFor(() -> pingServer(client, tservers.get(0)) == false, 60_000); + return true; + } + + } + + private boolean pingServer(AccumuloClient client, String server) { + final boolean lockVerificationThreadInUse = USE_VERIFICATION_THREAD.get(); + try { + client.instanceOperations().ping(server); + return true; + } catch (AccumuloException e) { + if (lockVerificationThreadInUse) { + // If the lock verification thread is in use, the the TabletServer + // should shut down and the call to ping will throw an Exception + return false; + } else { + // With the lock verification thread disabled, the StuckWatcher + // should prevent the TabletServer from shutting down during + // this test method. + fail("TabletServer unexpectedly shut down"); + return false; + } + } + + } + + }