This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 96c45827942514b4dcf1aa874eb301d0dc6064c3
Merge: 867d0eb6a6 be9fa22956
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Sat Dec 7 15:52:22 2024 +0000

    Merge branch '2.1' into 3.1

 .../org/apache/accumulo/core/conf/Property.java    |   5 +
 .../org/apache/accumulo/core/lock/ServiceLock.java |  23 +++
 .../org/apache/accumulo/server/AbstractServer.java |  70 ++++++-
 .../coordinator/CompactionCoordinator.java         |   5 +
 .../org/apache/accumulo/compactor/Compactor.java   |   6 +
 .../apache/accumulo/gc/SimpleGarbageCollector.java |  11 +-
 .../java/org/apache/accumulo/manager/Manager.java  |   7 +
 .../java/org/apache/accumulo/monitor/Monitor.java  |   5 +
 .../org/apache/accumulo/tserver/TabletServer.java  |   4 +-
 .../test/functional/HalfDeadServerWatcherIT.java   | 216 +++++++++++++++++++++
 .../apache/accumulo/test/lock/ServiceLockIT.java   |  24 ++-
 11 files changed, 366 insertions(+), 10 deletions(-)

diff --cc core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 32d08169bc,b1a658ea2f..6b70bd059d
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@@ -341,12 -329,24 +341,17 @@@ public enum Property 
            + " was changed and it now can accept multiple class names. The 
metrics spi was introduced in 2.1.3,"
            + " the deprecated factory is 
org.apache.accumulo.core.metrics.MeterRegistryFactory.",
        "2.1.0"),
 -  
GENERAL_SERVER_LOCK_VERIFICATION_INTERVAL("general.server.lock.verification.interval",
 "0",
 +  GENERAL_PROCESS_BIND_ADDRESS("general.process.bind.addr", "0.0.0.0", 
PropertyType.STRING,
 +      "The local IP address to which this server should bind for sending and 
receiving network traffic.",
 +      "3.0.0"),
++  
GENERAL_SERVER_LOCK_VERIFICATION_INTERVAL("general.server.lock.verification.interval",
 "2m",
+       PropertyType.TIMEDURATION,
+       "Interval at which the Manager and TabletServer should verify their 
server locks. A value of zero"
 -          + " disables this check.",
++          + " disables this check. The default value change from 0 to 2m in 
3.1.0.",
+       "2.1.4"),
    // properties that are specific to manager server behavior
    MANAGER_PREFIX("manager.", null, PropertyType.PREFIX,
 -      "Properties in this category affect the behavior of the manager server. 
"
 -          + "Since 2.1.0, all properties in this category replace the old 
`master.*` names.",
 -      "2.1.0"),
 -  @Deprecated(since = "2.1.0")
 -  @ReplacedBy(property = Property.MANAGER_PREFIX)
 -  MASTER_PREFIX("master.", null, PropertyType.PREFIX,
 -      "Properties in this category affect the behavior of the manager 
(formerly named master) server. "
 -          + "Since 2.1.0, all properties in this category are deprecated and 
replaced with corresponding "
 -          + "`manager.*` properties. The old `master.*` names can still be 
used until at release 3.0, but a warning "
 -          + "will be emitted. Configuration files should be updated to use 
the new property names.",
 -      "1.3.5"),
 +      "Properties in this category affect the behavior of the manager 
server.", "2.1.0"),
    MANAGER_CLIENTPORT("manager.port.client", "9999", PropertyType.PORT,
        "The port used for handling client connections on the manager.", 
"1.3.5"),
    MANAGER_TABLET_BALANCER("manager.tablet.balancer",
diff --cc 
server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java
index 47fe34bf26,c65314a0f1..3cc20c1e22
--- a/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java
@@@ -18,21 -18,20 +18,25 @@@
   */
  package org.apache.accumulo.server;
  
 -import java.util.Objects;
 +import static java.util.concurrent.TimeUnit.MILLISECONDS;
 +
+ import java.util.OptionalInt;
 -import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.ScheduledFuture;
  import java.util.concurrent.atomic.AtomicReference;
  
  import org.apache.accumulo.core.Constants;
  import org.apache.accumulo.core.classloader.ClassLoaderUtil;
 +import org.apache.accumulo.core.cli.ConfigOpts;
  import org.apache.accumulo.core.conf.AccumuloConfiguration;
  import org.apache.accumulo.core.conf.Property;
 -import org.apache.accumulo.core.fate.zookeeper.ServiceLock;
++import org.apache.accumulo.core.lock.ServiceLock;
  import org.apache.accumulo.core.metrics.MetricsProducer;
  import org.apache.accumulo.core.trace.TraceUtil;
+ import org.apache.accumulo.core.util.Halt;
 +import org.apache.accumulo.core.util.Timer;
 +import org.apache.accumulo.core.util.threads.ThreadPools;
+ import org.apache.accumulo.core.util.threads.Threads;
 +import org.apache.accumulo.server.mem.LowMemoryDetector;
  import org.apache.accumulo.server.metrics.ProcessMetrics;
  import org.apache.accumulo.server.security.SecurityUtil;
  import org.slf4j.Logger;
@@@ -45,18 -46,21 +51,21 @@@ public abstract class AbstractServer im
    private final ServerContext context;
    protected final String applicationName;
    private final String hostname;
+   private final Logger log;
    private final ProcessMetrics processMetrics;
 -  protected final long idleReportingPeriodNanos;
 -  private volatile long idlePeriodStartNanos = 0L;
 +  protected final long idleReportingPeriodMillis;
 +  private volatile Timer idlePeriodTimer = null;
+   private volatile Thread serverThread;
+   private volatile Thread verificationThread;
  
 -  protected AbstractServer(String appName, ServerOpts opts, String[] args) {
 -    this.log = LoggerFactory.getLogger(getClass().getName());
 +  protected AbstractServer(String appName, ConfigOpts opts, String[] args) {
      this.applicationName = appName;
      opts.parseArgs(appName, args);
 -    this.hostname = Objects.requireNonNull(opts.getAddress());
      var siteConfig = opts.getSiteConfiguration();
 +    this.hostname = siteConfig.get(Property.GENERAL_PROCESS_BIND_ADDRESS);
      SecurityUtil.serverLogin(siteConfig);
      context = new ServerContext(siteConfig);
-     Logger log = LoggerFactory.getLogger(getClass());
++    log = LoggerFactory.getLogger(getClass());
      log.info("Version " + Constants.VERSION);
      log.info("Instance " + context.getInstanceID());
      context.init(appName);
diff --cc 
server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
index c128c19fea,b735d8544d..fd4e27fc65
--- 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
+++ 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
@@@ -763,8 -768,13 +763,13 @@@ public class CompactionCoordinator exte
      }
    }
  
+   @Override
+   public ServiceLock getLock() {
+     return coordinatorLock;
+   }
+ 
    public static void main(String[] args) throws Exception {
 -    try (CompactionCoordinator compactor = new CompactionCoordinator(new 
ServerOpts(), args)) {
 +    try (CompactionCoordinator compactor = new CompactionCoordinator(new 
ConfigOpts(), args)) {
        compactor.runServer();
      }
    }
diff --cc 
server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
index 077956b9bd,bd78388836..62c2280b84
--- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
@@@ -77,8 -78,9 +77,9 @@@ public class SimpleGarbageCollector ext
        new GCStatus(new GcCycleStats(), new GcCycleStats(), new 
GcCycleStats(), new GcCycleStats());
  
    private final GcCycleMetrics gcCycleMetrics = new GcCycleMetrics();
+   private ServiceLock gcLock;
  
 -  SimpleGarbageCollector(ServerOpts opts, String[] args) {
 +  SimpleGarbageCollector(ConfigOpts opts, String[] args) {
      super("gc", opts, args);
  
      final AccumuloConfiguration conf = getConfiguration();
@@@ -350,11 -380,10 +351,10 @@@
      };
  
      UUID zooLockUUID = UUID.randomUUID();
-     ServiceLock lock =
-         new ServiceLock(getContext().getZooReaderWriter().getZooKeeper(), 
path, zooLockUUID);
+     gcLock = new 
ServiceLock(getContext().getZooReaderWriter().getZooKeeper(), path, 
zooLockUUID);
      while (true) {
-       if (lock.tryLock(lockWatcher,
+       if (gcLock.tryLock(lockWatcher,
 -          new ServerServices(addr.toString(), 
Service.GC_CLIENT).toString().getBytes(UTF_8))) {
 +          new ServiceLockData(zooLockUUID, addr.toString(), 
ThriftService.GC))) {
          log.debug("Got GC ZooKeeper lock");
          return;
        }
diff --cc 
test/src/main/java/org/apache/accumulo/test/functional/HalfDeadServerWatcherIT.java
index 0000000000,b0b8162348..83dd3c83a4
mode 000000,100644..100644
--- 
a/test/src/main/java/org/apache/accumulo/test/functional/HalfDeadServerWatcherIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/HalfDeadServerWatcherIT.java
@@@ -1,0 -1,216 +1,216 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *   https://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ package org.apache.accumulo.test.functional;
+ 
+ import static org.junit.jupiter.api.Assertions.assertEquals;
+ import static org.junit.jupiter.api.Assertions.assertThrows;
+ import static org.junit.jupiter.api.Assertions.assertTrue;
+ import static org.junit.jupiter.api.Assertions.fail;
+ 
+ import java.io.IOException;
+ import java.util.List;
+ import java.util.TreeMap;
+ import java.util.TreeSet;
+ import java.util.concurrent.atomic.AtomicBoolean;
+ 
+ import org.apache.accumulo.core.Constants;
++import org.apache.accumulo.core.cli.ConfigOpts;
+ import org.apache.accumulo.core.client.Accumulo;
+ import org.apache.accumulo.core.client.AccumuloClient;
+ import org.apache.accumulo.core.client.AccumuloException;
+ import org.apache.accumulo.core.conf.Property;
+ import org.apache.accumulo.core.data.TableId;
+ import org.apache.accumulo.core.dataImpl.KeyExtent;
+ import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy;
+ import org.apache.accumulo.core.util.UtilWaitThread;
+ import org.apache.accumulo.harness.AccumuloClusterHarness;
+ import org.apache.accumulo.minicluster.ServerType;
+ import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+ import org.apache.accumulo.server.ServerContext;
 -import org.apache.accumulo.server.ServerOpts;
+ import org.apache.accumulo.test.util.Wait;
+ import org.apache.accumulo.tserver.TabletServer;
+ import org.apache.accumulo.tserver.tablet.Tablet;
+ import org.apache.accumulo.tserver.tablet.TabletData;
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.io.Text;
+ import org.apache.zookeeper.KeeperException;
+ import org.apache.zookeeper.WatchedEvent;
+ import org.apache.zookeeper.Watcher;
+ import org.junit.jupiter.api.AfterEach;
+ import org.junit.jupiter.api.Test;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+ /**
+  * Test that validates that the TabletServer will be terminated when the lock 
is removed in
+  * ZooKeeper, but a Watcher in the TabletServer is preventing the LockWatcher 
to be invoked.
+  */
+ public class HalfDeadServerWatcherIT extends AccumuloClusterHarness {
+ 
+   public static class HalfDeadTabletServer extends TabletServer {
+ 
+     private static final Logger LOG = 
LoggerFactory.getLogger(HalfDeadTabletServer.class);
+ 
+     public static void main(String[] args) throws Exception {
 -      try (HalfDeadTabletServer tserver = new HalfDeadTabletServer(new 
ServerOpts(), args)) {
++      try (HalfDeadTabletServer tserver = new HalfDeadTabletServer(new 
ConfigOpts(), args)) {
+         tserver.runServer();
+       }
+     }
+ 
+     public static class StuckWatcher implements Watcher {
+       private static final Logger LOG = 
LoggerFactory.getLogger(StuckWatcher.class);
+ 
+       @Override
+       public void process(WatchedEvent event) {
+         LOG.info("started sleeping...");
+         while (true) {
+           LOG.info("still sleeping...");
+           UtilWaitThread.sleep(2000);
+         }
+       }
+ 
+     }
+ 
 -    protected HalfDeadTabletServer(ServerOpts opts, String[] args) {
++    protected HalfDeadTabletServer(ConfigOpts opts, String[] args) {
+       super(opts, args);
+     }
+ 
+     @Override
+     protected TreeMap<KeyExtent,TabletData> splitTablet(Tablet tablet, byte[] 
splitPoint)
+         throws IOException {
+       LOG.info("In HalfDeadServerWatcherIT::splitTablet");
+       TreeMap<KeyExtent,TabletData> results = super.splitTablet(tablet, 
splitPoint);
+       if (!tablet.getExtent().isMeta()) {
+         final TableId tid = tablet.getExtent().tableId();
+         final String zooRoot = this.getContext().getZooKeeperRoot();
+         final String tableZPath = zooRoot + Constants.ZTABLES + "/" + 
tid.canonical();
+         try {
+           this.getContext().getZooReaderWriter().exists(tableZPath, new 
StuckWatcher());
+         } catch (KeeperException | InterruptedException e) {
+           LOG.error("Error setting watch at: {}", tableZPath, e);
+         }
+         LOG.info("Set StuckWatcher at: {}", tableZPath);
+       }
+       return results;
+     }
+   }
+ 
+   private static final AtomicBoolean USE_VERIFICATION_THREAD = new 
AtomicBoolean(false);
+ 
+   @Override
+   public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration 
hadoopCoreSite) {
+     if (USE_VERIFICATION_THREAD.get()) {
+       cfg.setProperty(Property.GENERAL_SERVER_LOCK_VERIFICATION_INTERVAL, 
"10s");
+     } else {
+       cfg.setProperty(Property.GENERAL_SERVER_LOCK_VERIFICATION_INTERVAL, 
"0");
+     }
+     cfg.setServerClass(ServerType.TABLET_SERVER, HalfDeadTabletServer.class);
+     cfg.setNumCompactors(0);
+     cfg.setNumScanServers(0);
+     cfg.setNumTservers(1);
+   }
+ 
+   @AfterEach
+   public void afterTest() throws Exception {
+     getCluster().getClusterControl().stopAllServers(ServerType.TABLET_SERVER);
+     super.teardownCluster();
+     USE_VERIFICATION_THREAD.set(!USE_VERIFICATION_THREAD.get());
+   }
+ 
+   @Test
+   public void testOne() throws Exception {
+     if (USE_VERIFICATION_THREAD.get()) {
+       // This test should use the verification thread, which should
+       // end the TabletServer, throw an Exception on the ping call,
+       // and return true
+       assertTrue(testTabletServerWithStuckWatcherDies());
+     } else {
+       // This test should time out
+       IllegalStateException e =
+           assertThrows(IllegalStateException.class, () -> 
testTabletServerWithStuckWatcherDies());
+       assertTrue(e.getMessage().contains("Timeout exceeded"));
+     }
+   }
+ 
+   @Test
+   public void testTwo() throws Exception {
+     if (USE_VERIFICATION_THREAD.get()) {
+       // This test should use the verification thread, which should
+       // end the TabletServer, throw an Exception on the ping call,
+       // and return true
+       assertTrue(testTabletServerWithStuckWatcherDies());
+     } else {
+       // This test should time out
+       IllegalStateException e =
+           assertThrows(IllegalStateException.class, () -> 
testTabletServerWithStuckWatcherDies());
+       assertTrue(e.getMessage().contains("Timeout exceeded"));
+     }
+   }
+ 
+   public boolean testTabletServerWithStuckWatcherDies() throws Exception {
+     try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
+       String tableName = getUniqueNames(1)[0];
+       client.tableOperations().create(tableName);
+ 
+       // add splits to the table, which should set a StuckWatcher on the 
table node in zookeeper
+       TreeSet<Text> splits = new TreeSet<>();
+       splits.add(new Text("j"));
+       splits.add(new Text("t"));
+       client.tableOperations().addSplits(tableName, splits);
+ 
+       // delete the table, which should invoke the watcher
+       client.tableOperations().delete(tableName);
+ 
+       final List<String> tservers = 
client.instanceOperations().getTabletServers();
+       assertEquals(1, tservers.size());
+ 
+       // Delete the lock for the TabletServer
+       final ServerContext ctx = getServerContext();
+       final String zooRoot = ctx.getZooKeeperRoot();
+       ctx.getZooReaderWriter().recursiveDelete(
+           zooRoot + Constants.ZTSERVERS + "/" + tservers.get(0), 
NodeMissingPolicy.FAIL);
+ 
+       Wait.waitFor(() -> pingServer(client, tservers.get(0)) == false, 
60_000);
+       return true;
+     }
+ 
+   }
+ 
+   private boolean pingServer(AccumuloClient client, String server) {
+     final boolean lockVerificationThreadInUse = USE_VERIFICATION_THREAD.get();
+     try {
+       client.instanceOperations().ping(server);
+       return true;
+     } catch (AccumuloException e) {
+       if (lockVerificationThreadInUse) {
+         // If the lock verification thread is in use, the the TabletServer
+         // should shut down and the call to ping will throw an Exception
+         return false;
+       } else {
+         // With the lock verification thread disabled, the StuckWatcher
+         // should prevent the TabletServer from shutting down during
+         // this test method.
+         fail("TabletServer unexpectedly shut down");
+         return false;
+       }
+     }
+ 
+   }
+ 
+ }

Reply via email to