This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 5b9910f5d43f52e4df2badbab020bb1ea6afbd75
Merge: 3787f1b49d 96c4582794
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Sat Dec 7 17:33:13 2024 +0000

    Merge branch '3.1'

 .../java/org/apache/accumulo/core/Constants.java   |   2 -
 .../org/apache/accumulo/core/conf/Property.java    |   5 +
 .../org/apache/accumulo/core/lock/ServiceLock.java |  23 ++
 .../apache/accumulo/core/logging/TabletLogger.java |  13 +
 .../core/metadata/schema/MetadataSchema.java       |  16 -
 .../org/apache/accumulo/server/AbstractServer.java |  70 ++++-
 .../accumulo/server/compaction/FileCompactor.java  |  12 +-
 .../org/apache/accumulo/server/fs/FileManager.java |  15 +-
 .../accumulo/server/init/ZooKeeperInitializer.java |   2 -
 .../accumulo/server/problems/ProblemReport.java    | 252 ----------------
 .../server/problems/ProblemReportingIterator.java  |  24 +-
 .../accumulo/server/problems/ProblemReports.java   | 323 ---------------------
 .../accumulo/server/problems/ProblemType.java      |  23 --
 .../server/problems/ProblemReportTest.java         | 205 -------------
 .../problems/ProblemReportingIteratorTest.java     |   2 +-
 .../org/apache/accumulo/compactor/Compactor.java   |   4 +
 .../apache/accumulo/gc/SimpleGarbageCollector.java |   7 +-
 .../java/org/apache/accumulo/manager/Manager.java  |   6 +
 .../accumulo/manager/tableOps/delete/CleanUp.java  |   8 -
 .../accumulo/manager/upgrade/Upgrader11to12.java   | 147 ++++++++++
 .../manager/upgrade/Upgrader11to12Test.java        |   2 +
 .../java/org/apache/accumulo/monitor/Monitor.java  |  26 +-
 .../monitor/rest/problems/ProblemDetail.java       |  42 ---
 .../rest/problems/ProblemDetailInformation.java    |  69 -----
 .../monitor/rest/problems/ProblemSummary.java      |  42 ---
 .../rest/problems/ProblemSummaryInformation.java   |  62 ----
 .../monitor/rest/problems/ProblemsResource.java    | 185 ------------
 .../monitor/rest/status/StatusInformation.java     |   6 +-
 .../monitor/rest/status/StatusResource.java        |   3 +-
 .../accumulo/monitor/resources/js/functions.js     |  40 +--
 .../apache/accumulo/monitor/resources/js/navbar.js |  34 ---
 .../accumulo/monitor/resources/js/problems.js      | 167 -----------
 .../apache/accumulo/monitor/templates/navbar.ftl   |   8 -
 .../apache/accumulo/monitor/templates/problems.ftl |  71 -----
 .../apache/accumulo/tserver/AssignmentHandler.java |  19 +-
 .../org/apache/accumulo/tserver/TabletServer.java  |   1 +
 .../accumulo/tserver/tablet/MinorCompactor.java    |  15 -
 .../org/apache/accumulo/tserver/tablet/Tablet.java |   5 -
 .../test/functional/HalfDeadServerWatcherIT.java   | 225 ++++++++++++++
 .../apache/accumulo/test/lock/ServiceLockIT.java   |  24 +-
 40 files changed, 550 insertions(+), 1655 deletions(-)

diff --cc core/src/main/java/org/apache/accumulo/core/Constants.java
index 28a9d8ea36,e3480686d7..6270fcb6c1
--- a/core/src/main/java/org/apache/accumulo/core/Constants.java
+++ b/core/src/main/java/org/apache/accumulo/core/Constants.java
@@@ -74,8 -75,8 +74,6 @@@ public class Constants 
    public static final String ZDEAD = "/dead";
    public static final String ZDEADTSERVERS = ZDEAD + "/tservers";
  
-   public static final String ZPROBLEMS = "/problems";
 -  public static final String BULK_ARBITRATOR_TYPE = "bulkTx";
--
    public static final String ZFATE = "/fate";
  
    public static final String ZNEXT_FILE = "/next_file";
diff --cc core/src/main/java/org/apache/accumulo/core/logging/TabletLogger.java
index 40dc0b5eb1,0b6849216a..c9716b561e
--- a/core/src/main/java/org/apache/accumulo/core/logging/TabletLogger.java
+++ b/core/src/main/java/org/apache/accumulo/core/logging/TabletLogger.java
@@@ -23,14 -23,13 +23,15 @@@ import static java.util.stream.Collecto
  import java.util.Collection;
  import java.util.List;
  import java.util.Optional;
 +import java.util.SortedSet;
  import java.util.UUID;
  
 -import org.apache.accumulo.core.client.admin.CompactionConfig;
  import org.apache.accumulo.core.client.admin.compaction.CompactableFile;
+ import org.apache.accumulo.core.data.TableId;
  import org.apache.accumulo.core.dataImpl.KeyExtent;
 +import org.apache.accumulo.core.fate.FateId;
  import org.apache.accumulo.core.metadata.CompactableFileImpl;
 +import org.apache.accumulo.core.metadata.ReferencedTabletFile;
  import org.apache.accumulo.core.metadata.StoredTabletFile;
  import org.apache.accumulo.core.metadata.TServerInstance;
  import org.apache.accumulo.core.metadata.TabletFile;
diff --cc 
core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
index 9374364a07,b7269eb494..5360c98274
--- 
a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
@@@ -529,20 -415,16 +529,4 @@@ public class MetadataSchema 
  
    }
  
-   /**
-    * Holds error message processing flags
-    */
-   public static class ProblemSection {
 -  public static class ExternalCompactionSection {
--    private static final Section section =
-         new Section(RESERVED_PREFIX + "err_", true, RESERVED_PREFIX + "err`", 
false);
 -        new Section(RESERVED_PREFIX + "ecomp", true, RESERVED_PREFIX + 
"ecomq", false);
--
--    public static Range getRange() {
--      return section.getRange();
--    }
--
--    public static String getRowPrefix() {
--      return section.getRowPrefix();
--    }
- 
--  }
  }
diff --cc 
server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java
index ba621f6b01,3cc20c1e22..1dac3b0a60
--- a/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java
@@@ -20,20 -20,22 +20,24 @@@ package org.apache.accumulo.server
  
  import static java.util.concurrent.TimeUnit.MILLISECONDS;
  
+ import java.util.OptionalInt;
  import java.util.concurrent.ScheduledFuture;
  import java.util.concurrent.atomic.AtomicReference;
 +import java.util.function.Function;
  
  import org.apache.accumulo.core.Constants;
  import org.apache.accumulo.core.classloader.ClassLoaderUtil;
  import org.apache.accumulo.core.cli.ConfigOpts;
  import org.apache.accumulo.core.conf.AccumuloConfiguration;
  import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.conf.SiteConfiguration;
+ import org.apache.accumulo.core.lock.ServiceLock;
  import org.apache.accumulo.core.metrics.MetricsProducer;
  import org.apache.accumulo.core.trace.TraceUtil;
+ import org.apache.accumulo.core.util.Halt;
  import org.apache.accumulo.core.util.Timer;
  import org.apache.accumulo.core.util.threads.ThreadPools;
+ import org.apache.accumulo.core.util.threads.Threads;
  import org.apache.accumulo.server.mem.LowMemoryDetector;
  import org.apache.accumulo.server.metrics.ProcessMetrics;
  import org.apache.accumulo.server.security.SecurityUtil;
@@@ -47,21 -51,21 +53,24 @@@ public abstract class AbstractServer im
    private final ServerContext context;
    protected final String applicationName;
    private final String hostname;
 +  private final String resourceGroup;
+   private final Logger log;
    private final ProcessMetrics processMetrics;
    protected final long idleReportingPeriodMillis;
    private volatile Timer idlePeriodTimer = null;
+   private volatile Thread serverThread;
+   private volatile Thread verificationThread;
  
 -  protected AbstractServer(String appName, ConfigOpts opts, String[] args) {
 +  protected AbstractServer(String appName, ConfigOpts opts,
 +      Function<SiteConfiguration,ServerContext> serverContextFactory, 
String[] args) {
      this.applicationName = appName;
      opts.parseArgs(appName, args);
      var siteConfig = opts.getSiteConfiguration();
      this.hostname = siteConfig.get(Property.GENERAL_PROCESS_BIND_ADDRESS);
 +    this.resourceGroup = getResourceGroupPropertyValue(siteConfig);
      SecurityUtil.serverLogin(siteConfig);
 -    context = new ServerContext(siteConfig);
 +    context = serverContextFactory.apply(siteConfig);
-     Logger log = LoggerFactory.getLogger(getClass());
+     log = LoggerFactory.getLogger(getClass());
      log.info("Version " + Constants.VERSION);
      log.info("Instance " + context.getInstanceID());
      context.init(appName);
diff --cc 
server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index 3eabcedbf6,b9cded93fa..ca19dec098
--- 
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++ 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@@ -1018,4 -978,9 +1018,8 @@@ public class Compactor extends Abstract
      }
    }
  
+   @Override
+   public ServiceLock getLock() {
+     return compactorLock;
+   }
 -
  }
diff --cc 
server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
index 4ba44a5fef,62c2280b84..65f80b1864
--- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
@@@ -89,12 -77,10 +89,12 @@@ public class SimpleGarbageCollector ext
        new GCStatus(new GcCycleStats(), new GcCycleStats(), new 
GcCycleStats(), new GcCycleStats());
  
    private final GcCycleMetrics gcCycleMetrics = new GcCycleMetrics();
- 
    private ServiceLock gcLock;
+ 
 +  private final Timer lastCompactorCheck = Timer.startNew();
 +
    SimpleGarbageCollector(ConfigOpts opts, String[] args) {
 -    super("gc", opts, args);
 +    super("gc", opts, ServerContext::new, args);
  
      final AccumuloConfiguration conf = getConfiguration();
  
diff --cc server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index 1593596ccf,dfb672013f..062e306202
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@@ -1858,28 -1934,9 +1859,33 @@@ public class Manager extends AbstractSe
      tabletBalancer.getAssignments(params);
    }
  
 +  public TabletStateStore getTabletStateStore(DataLevel level) {
 +    switch (level) {
 +      case METADATA:
 +        return this.metadataTabletStore;
 +      case ROOT:
 +        return this.rootTabletStore;
 +      case USER:
 +        return this.userTabletStore;
 +      default:
 +        throw new IllegalStateException("Unhandled DataLevel value: " + 
level);
 +    }
 +  }
 +
 +  @Override
 +  public void registerMetrics(MeterRegistry registry) {
 +    super.registerMetrics(registry);
 +    compactionCoordinator.registerMetrics(registry);
 +  }
 +
 +  private Map<FateInstanceType,Fate<Manager>> getFateRefs() {
 +    var fateRefs = this.fateRefs.get();
 +    Preconditions.checkState(fateRefs != null, "Unexpected null fate 
references map");
 +    return fateRefs;
 +  }
++
+   @Override
+   public ServiceLock getLock() {
+     return managerLock;
+   }
 -
  }
diff --cc 
server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/CleanUp.java
index 142fe479d5,c45fa1e54e..b8c58164d4
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/CleanUp.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/CleanUp.java
@@@ -44,7 -47,7 +44,6 @@@ import org.apache.accumulo.manager.Mana
  import org.apache.accumulo.manager.tableOps.ManagerRepo;
  import org.apache.accumulo.manager.tableOps.Utils;
  import org.apache.accumulo.server.fs.VolumeManager;
- import org.apache.accumulo.server.problems.ProblemReports;
 -import org.apache.accumulo.server.manager.state.MetaDataTableScanner;
  import org.apache.accumulo.server.util.MetadataTableUtil;
  import org.apache.hadoop.fs.Path;
  import org.slf4j.Logger;
diff --cc server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
index 12aa0989d4,239394f6ab..1ac2914df8
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
@@@ -82,10 -80,7 +81,8 @@@ import org.apache.accumulo.monitor.rest
  import org.apache.accumulo.server.AbstractServer;
  import org.apache.accumulo.server.HighlyAvailableService;
  import org.apache.accumulo.server.ServerContext;
- import org.apache.accumulo.server.problems.ProblemReports;
- import org.apache.accumulo.server.problems.ProblemType;
  import org.apache.accumulo.server.util.TableInfoUtil;
 +import org.apache.thrift.transport.TTransportException;
  import org.apache.zookeeper.KeeperException;
  import org.eclipse.jetty.servlet.DefaultServlet;
  import org.eclipse.jetty.servlet.ServletHolder;
@@@ -144,12 -139,14 +141,10 @@@ public class Monitor extends AbstractSe
  
    private final AtomicBoolean fetching = new AtomicBoolean(false);
    private ManagerMonitorInfo mmi;
-   private Map<TableId,Map<ProblemType,Integer>> problemSummary = 
Collections.emptyMap();
-   private Exception problemException;
    private GCStatus gcStatus;
 -  private Optional<HostAndPort> coordinatorHost = Optional.empty();
 -  private long coordinatorCheckNanos = 0L;
 -  private CompactionCoordinatorService.Client coordinatorClient;
 +  private volatile Optional<HostAndPort> coordinatorHost = Optional.empty();
    private final String coordinatorMissingMsg =
 -      "Error getting the compaction coordinator. Check that it is running. It 
is not "
 -          + "started automatically with other cluster processes so must be 
started by running "
 -          + "'accumulo compaction-coordinator'.";
 +      "Error getting the compaction coordinator client. Check that the 
Manager is running.";
  
    private EmbeddedWebServer server;
    private int livePort = 0;
@@@ -312,16 -294,23 +307,8 @@@
          this.totalLookups = totalLookups;
  
        }
-       try {
-         this.problemSummary = ProblemReports.getInstance(context).summarize();
-         this.problemException = null;
-       } catch (Exception e) {
-         log.info("Failed to obtain problem reports ", e);
-         this.problemSummary = Collections.emptyMap();
-         this.problemException = e;
-       }
  
 -      // check for compaction coordinator host and only notify its discovery
 -      Optional<HostAndPort> previousHost;
 -      if (System.nanoTime() - coordinatorCheckNanos > fetchTimeNanos) {
 -        previousHost = coordinatorHost;
 -        coordinatorHost = 
ExternalCompactionUtil.findCompactionCoordinator(context);
 -        coordinatorCheckNanos = System.nanoTime();
 -        if (previousHost.isEmpty() && coordinatorHost.isPresent()) {
 -          log.info("External Compaction Coordinator found at {}", 
coordinatorHost.orElseThrow());
 -        }
 -      }
 -
      } finally {
 -      if (coordinatorClient != null) {
 -        ThriftUtil.returnClient(coordinatorClient, context);
 -        coordinatorClient = null;
 -      }
        lastRecalc.set(currentTime);
        // stop fetching; log an error if this thread wasn't already fetching
        if (!fetching.compareAndSet(true, false)) {
diff --cc 
server/tserver/src/main/java/org/apache/accumulo/tserver/AssignmentHandler.java
index ec56611a03,1d6f1fe244..5d55a481fa
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/AssignmentHandler.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/AssignmentHandler.java
@@@ -19,9 -19,10 +19,8 @@@
  package org.apache.accumulo.tserver;
  
  import static java.util.concurrent.TimeUnit.MINUTES;
- import static org.apache.accumulo.server.problems.ProblemType.TABLET_LOAD;
  
 -import java.util.Arrays;
  import java.util.Set;
 -import java.util.TreeSet;
  import java.util.concurrent.TimeUnit;
  
  import org.apache.accumulo.core.client.AccumuloException;
@@@ -36,13 -37,12 +35,10 @@@ import org.apache.accumulo.core.util.th
  import org.apache.accumulo.core.util.threads.Threads;
  import org.apache.accumulo.server.manager.state.Assignment;
  import org.apache.accumulo.server.manager.state.TabletStateStore;
- import org.apache.accumulo.server.problems.ProblemReport;
- import org.apache.accumulo.server.problems.ProblemReports;
 -import org.apache.accumulo.server.util.ManagerMetadataUtil;
  import 
org.apache.accumulo.tserver.TabletServerResourceManager.TabletResourceManager;
  import org.apache.accumulo.tserver.managermessage.TabletStatusMessage;
  import org.apache.accumulo.tserver.tablet.Tablet;
 -import org.apache.accumulo.tserver.tablet.TabletData;
 -import org.apache.hadoop.io.Text;
 +import org.apache.accumulo.tserver.tablet.Tablet.RefreshPurpose;
- import org.apache.hadoop.io.Text;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
@@@ -100,7 -100,7 +96,6 @@@ class AssignmentHandler implements Runn
      }
  
      // check Metadata table before accepting assignment
--    Text locationToOpen = null;
      TabletMetadata tabletMetadata = null;
      boolean canLoad = false;
      try {
diff --cc 
server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index a022910cd7,7482e75a6e..d69dd1b74b
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@@ -84,15 -92,19 +84,12 @@@ import org.apache.accumulo.core.tablets
  import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
  import org.apache.accumulo.core.trace.TraceUtil;
  import org.apache.accumulo.core.util.Pair;
 -import org.apache.accumulo.core.volume.Volume;
 -import org.apache.accumulo.server.ServerContext;
 +import org.apache.accumulo.core.util.UtilWaitThread;
  import org.apache.accumulo.server.compaction.CompactionStats;
 -import org.apache.accumulo.server.compaction.PausedCompactionMetrics;
 -import org.apache.accumulo.server.fs.VolumeChooserEnvironmentImpl;
 -import org.apache.accumulo.server.fs.VolumeUtil;
 -import org.apache.accumulo.server.fs.VolumeUtil.TabletFiles;
 +import org.apache.accumulo.server.fs.VolumeManager;
- import org.apache.accumulo.server.problems.ProblemReport;
- import org.apache.accumulo.server.problems.ProblemReports;
- import org.apache.accumulo.server.problems.ProblemType;
 +import 
org.apache.accumulo.server.tablets.ConditionCheckerContext.ConditionChecker;
 +import org.apache.accumulo.server.tablets.TabletNameGenerator;
  import org.apache.accumulo.server.tablets.TabletTime;
 -import org.apache.accumulo.server.tablets.UniqueNameAllocator;
 -import org.apache.accumulo.server.util.FileUtil;
 -import org.apache.accumulo.server.util.ManagerMetadataUtil;
 -import org.apache.accumulo.server.util.MetadataTableUtil;
 -import org.apache.accumulo.tserver.ConditionCheckerContext.ConditionChecker;
  import org.apache.accumulo.tserver.InMemoryMap;
  import org.apache.accumulo.tserver.MinorCompactionReason;
  import org.apache.accumulo.tserver.TabletServer;
diff --cc 
test/src/main/java/org/apache/accumulo/test/functional/HalfDeadServerWatcherIT.java
index 0000000000,83dd3c83a4..3f791f411b
mode 000000,100644..100644
--- 
a/test/src/main/java/org/apache/accumulo/test/functional/HalfDeadServerWatcherIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/HalfDeadServerWatcherIT.java
@@@ -1,0 -1,216 +1,225 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *   https://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ package org.apache.accumulo.test.functional;
+ 
+ import static org.junit.jupiter.api.Assertions.assertEquals;
+ import static org.junit.jupiter.api.Assertions.assertThrows;
+ import static org.junit.jupiter.api.Assertions.assertTrue;
+ import static org.junit.jupiter.api.Assertions.fail;
+ 
 -import java.io.IOException;
 -import java.util.List;
 -import java.util.TreeMap;
 -import java.util.TreeSet;
++import java.util.Set;
+ import java.util.concurrent.atomic.AtomicBoolean;
++import java.util.function.Function;
+ 
+ import org.apache.accumulo.core.Constants;
+ import org.apache.accumulo.core.cli.ConfigOpts;
+ import org.apache.accumulo.core.client.Accumulo;
+ import org.apache.accumulo.core.client.AccumuloClient;
+ import org.apache.accumulo.core.client.AccumuloException;
++import org.apache.accumulo.core.client.admin.NewTableConfiguration;
++import org.apache.accumulo.core.client.admin.TabletAvailability;
++import org.apache.accumulo.core.client.admin.servers.ServerId;
+ import org.apache.accumulo.core.conf.Property;
++import org.apache.accumulo.core.conf.SiteConfiguration;
+ import org.apache.accumulo.core.data.TableId;
 -import org.apache.accumulo.core.dataImpl.KeyExtent;
+ import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy;
++import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector;
++import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath;
+ import org.apache.accumulo.core.util.UtilWaitThread;
+ import org.apache.accumulo.harness.AccumuloClusterHarness;
+ import org.apache.accumulo.minicluster.ServerType;
+ import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+ import org.apache.accumulo.server.ServerContext;
+ import org.apache.accumulo.test.util.Wait;
+ import org.apache.accumulo.tserver.TabletServer;
 -import org.apache.accumulo.tserver.tablet.Tablet;
 -import org.apache.accumulo.tserver.tablet.TabletData;
+ import org.apache.hadoop.conf.Configuration;
 -import org.apache.hadoop.io.Text;
+ import org.apache.zookeeper.KeeperException;
+ import org.apache.zookeeper.WatchedEvent;
+ import org.apache.zookeeper.Watcher;
+ import org.junit.jupiter.api.AfterEach;
+ import org.junit.jupiter.api.Test;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
++import com.google.common.net.HostAndPort;
++
+ /**
+  * Test that validates that the TabletServer will be terminated when the lock 
is removed in
+  * ZooKeeper, but a Watcher in the TabletServer is preventing the LockWatcher 
to be invoked.
+  */
+ public class HalfDeadServerWatcherIT extends AccumuloClusterHarness {
+ 
+   public static class HalfDeadTabletServer extends TabletServer {
+ 
+     private static final Logger LOG = 
LoggerFactory.getLogger(HalfDeadTabletServer.class);
+ 
+     public static void main(String[] args) throws Exception {
 -      try (HalfDeadTabletServer tserver = new HalfDeadTabletServer(new 
ConfigOpts(), args)) {
++      try (HalfDeadTabletServer tserver =
++          new HalfDeadTabletServer(new ConfigOpts(), ServerContext::new, 
args)) {
+         tserver.runServer();
+       }
+     }
+ 
+     public static class StuckWatcher implements Watcher {
+       private static final Logger LOG = 
LoggerFactory.getLogger(StuckWatcher.class);
+ 
+       @Override
+       public void process(WatchedEvent event) {
+         LOG.info("started sleeping...");
+         while (true) {
+           LOG.info("still sleeping...");
+           UtilWaitThread.sleep(2000);
+         }
+       }
+ 
+     }
+ 
 -    protected HalfDeadTabletServer(ConfigOpts opts, String[] args) {
 -      super(opts, args);
++    protected HalfDeadTabletServer(ConfigOpts opts,
++        Function<SiteConfiguration,ServerContext> serverContextFactory, 
String[] args) {
++      super(opts, serverContextFactory, args);
+     }
+ 
+     @Override
 -    protected TreeMap<KeyExtent,TabletData> splitTablet(Tablet tablet, byte[] 
splitPoint)
 -        throws IOException {
 -      LOG.info("In HalfDeadServerWatcherIT::splitTablet");
 -      TreeMap<KeyExtent,TabletData> results = super.splitTablet(tablet, 
splitPoint);
 -      if (!tablet.getExtent().isMeta()) {
 -        final TableId tid = tablet.getExtent().tableId();
 -        final String zooRoot = this.getContext().getZooKeeperRoot();
 -        final String tableZPath = zooRoot + Constants.ZTABLES + "/" + 
tid.canonical();
 -        try {
 -          this.getContext().getZooReaderWriter().exists(tableZPath, new 
StuckWatcher());
 -        } catch (KeeperException | InterruptedException e) {
 -          LOG.error("Error setting watch at: {}", tableZPath, e);
++    public void evaluateOnDemandTabletsForUnload() {
++      super.evaluateOnDemandTabletsForUnload();
++      getOnlineTablets().keySet().forEach(ke -> {
++        if (!ke.isMeta()) {
++          final TableId tid = ke.tableId();
++          final String zooRoot = this.getContext().getZooKeeperRoot();
++          final String tableZPath = zooRoot + Constants.ZTABLES + "/" + 
tid.canonical();
++          try {
++            this.getContext().getZooReaderWriter().exists(tableZPath, new 
StuckWatcher());
++          } catch (KeeperException | InterruptedException e) {
++            LOG.error("Error setting watch at: {}", tableZPath, e);
++          }
++          LOG.info("Set StuckWatcher at: {}", tableZPath);
+         }
 -        LOG.info("Set StuckWatcher at: {}", tableZPath);
 -      }
 -      return results;
++      });
+     }
+   }
+ 
+   private static final AtomicBoolean USE_VERIFICATION_THREAD = new 
AtomicBoolean(false);
+ 
+   @Override
+   public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration 
hadoopCoreSite) {
+     if (USE_VERIFICATION_THREAD.get()) {
+       cfg.setProperty(Property.GENERAL_SERVER_LOCK_VERIFICATION_INTERVAL, 
"10s");
+     } else {
+       cfg.setProperty(Property.GENERAL_SERVER_LOCK_VERIFICATION_INTERVAL, 
"0");
+     }
+     cfg.setServerClass(ServerType.TABLET_SERVER, HalfDeadTabletServer.class);
 -    cfg.setNumCompactors(0);
 -    cfg.setNumScanServers(0);
 -    cfg.setNumTservers(1);
++    cfg.setProperty(Property.TSERV_ONDEMAND_UNLOADER_INTERVAL, "30s");
++    cfg.getClusterServerConfiguration().setNumDefaultCompactors(1);
++    cfg.getClusterServerConfiguration().setNumDefaultScanServers(0);
++    cfg.getClusterServerConfiguration().setNumDefaultTabletServers(1);
+   }
+ 
+   @AfterEach
+   public void afterTest() throws Exception {
+     getCluster().getClusterControl().stopAllServers(ServerType.TABLET_SERVER);
+     super.teardownCluster();
+     USE_VERIFICATION_THREAD.set(!USE_VERIFICATION_THREAD.get());
+   }
+ 
+   @Test
+   public void testOne() throws Exception {
+     if (USE_VERIFICATION_THREAD.get()) {
+       // This test should use the verification thread, which should
+       // end the TabletServer, throw an Exception on the ping call,
+       // and return true
+       assertTrue(testTabletServerWithStuckWatcherDies());
+     } else {
+       // This test should time out
+       IllegalStateException e =
+           assertThrows(IllegalStateException.class, () -> 
testTabletServerWithStuckWatcherDies());
+       assertTrue(e.getMessage().contains("Timeout exceeded"));
+     }
+   }
+ 
+   @Test
+   public void testTwo() throws Exception {
+     if (USE_VERIFICATION_THREAD.get()) {
+       // This test should use the verification thread, which should
+       // end the TabletServer, throw an Exception on the ping call,
+       // and return true
+       assertTrue(testTabletServerWithStuckWatcherDies());
+     } else {
+       // This test should time out
+       IllegalStateException e =
+           assertThrows(IllegalStateException.class, () -> 
testTabletServerWithStuckWatcherDies());
+       assertTrue(e.getMessage().contains("Timeout exceeded"));
+     }
+   }
+ 
+   public boolean testTabletServerWithStuckWatcherDies() throws Exception {
+     try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
+       String tableName = getUniqueNames(1)[0];
 -      client.tableOperations().create(tableName);
+ 
 -      // add splits to the table, which should set a StuckWatcher on the 
table node in zookeeper
 -      TreeSet<Text> splits = new TreeSet<>();
 -      splits.add(new Text("j"));
 -      splits.add(new Text("t"));
 -      client.tableOperations().addSplits(tableName, splits);
++      client.tableOperations().create(tableName,
++          new 
NewTableConfiguration().withInitialTabletAvailability(TabletAvailability.HOSTED));
++
++      // Wait a minute, the evaluator thread runs on a 30s interval to set 
the StuckWatcher
++      Thread.sleep(60_000);
+ 
+       // delete the table, which should invoke the watcher
+       client.tableOperations().delete(tableName);
+ 
 -      final List<String> tservers = 
client.instanceOperations().getTabletServers();
++      final Set<ServerId> tservers =
++          client.instanceOperations().getServers(ServerId.Type.TABLET_SERVER);
+       assertEquals(1, tservers.size());
+ 
++      ServerId tserver = tservers.iterator().next();
++
+       // Delete the lock for the TabletServer
+       final ServerContext ctx = getServerContext();
 -      final String zooRoot = ctx.getZooKeeperRoot();
 -      ctx.getZooReaderWriter().recursiveDelete(
 -          zooRoot + Constants.ZTSERVERS + "/" + tservers.get(0), 
NodeMissingPolicy.FAIL);
 -
 -      Wait.waitFor(() -> pingServer(client, tservers.get(0)) == false, 
60_000);
++      Set<ServiceLockPath> serverPaths = ctx.getServerPaths().getTabletServer(
++          (rg) -> rg.equals(Constants.DEFAULT_RESOURCE_GROUP_NAME),
++          
AddressSelector.exact(HostAndPort.fromString(tserver.toHostPortString())), 
true);
++      assertEquals(1, serverPaths.size());
++      
ctx.getZooReaderWriter().recursiveDelete(serverPaths.iterator().next().toString(),
++          NodeMissingPolicy.FAIL);
++
++      Wait.waitFor(() -> pingServer(client, tserver.toHostPortString()) == 
false, 60_000);
+       return true;
+     }
+ 
+   }
+ 
+   private boolean pingServer(AccumuloClient client, String server) {
+     final boolean lockVerificationThreadInUse = USE_VERIFICATION_THREAD.get();
+     try {
+       client.instanceOperations().ping(server);
+       return true;
+     } catch (AccumuloException e) {
+       if (lockVerificationThreadInUse) {
+         // If the lock verification thread is in use, the the TabletServer
+         // should shut down and the call to ping will throw an Exception
+         return false;
+       } else {
+         // With the lock verification thread disabled, the StuckWatcher
+         // should prevent the TabletServer from shutting down during
+         // this test method.
+         fail("TabletServer unexpectedly shut down");
+         return false;
+       }
+     }
+ 
+   }
+ 
+ }

Reply via email to