Repository: accumulo
Updated Branches:
  refs/heads/master 18fee706b -> 3bd0caa58


ACCUMULO-2419 Reimplement SimpleTimer using executors

The SimpleTimer class is changed to use Java's executor service for thread 
pools, rather
than java.util.Timer. This allows Accumulo to run SimpleTimer threads using 
more than
one thread, for systems that can handle the load.

This change also transitions a wide variety of SimpleTimer users to use the new 
form of the
SimpleTimer.getInstance() method.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/3bd0caa5
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/3bd0caa5
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/3bd0caa5

Branch: refs/heads/master
Commit: 3bd0caa5877e16fd3ff98863040dc6eb67fdeef2
Parents: 18fee70
Author: Bill Havanki <bhava...@cloudera.com>
Authored: Mon Mar 3 13:51:37 2014 -0500
Committer: Bill Havanki <bhava...@cloudera.com>
Committed: Tue Mar 18 09:49:01 2014 -0400

----------------------------------------------------------------------
 .../org/apache/accumulo/core/conf/Property.java |   2 +
 .../impl/MiniAccumuloClusterImpl.java           |   2 +-
 .../org/apache/accumulo/server/Accumulo.java    |   7 +-
 .../accumulo/server/master/LiveTServerSet.java  |   2 +-
 .../accumulo/server/util/TServerUtils.java      |  17 +--
 .../accumulo/server/util/time/SimpleTimer.java  | 124 ++++++++++++++-----
 .../server/zookeeper/DistributedWorkQueue.java  |   7 +-
 .../server/util/time/SimpleTimerTest.java       |  99 +++++++++++++++
 .../accumulo/gc/SimpleGarbageCollector.java     |   4 +-
 .../java/org/apache/accumulo/master/Master.java |   4 +-
 .../master/recovery/RecoveryManager.java        |  14 ++-
 .../accumulo/master/tableOps/BulkImport.java    |   2 +-
 .../accumulo/monitor/servlets/BasicServlet.java |   2 +-
 .../org/apache/accumulo/tracer/TraceServer.java |   2 +-
 .../accumulo/tserver/CompactionWatcher.java     |   2 +-
 .../apache/accumulo/tserver/FileManager.java    |   2 +-
 .../apache/accumulo/tserver/TabletServer.java   |  24 ++--
 .../tserver/TabletServerResourceManager.java    |   2 +-
 .../apache/accumulo/tserver/log/LogSorter.java  |   2 +-
 .../accumulo/test/functional/ZombieTServer.java |   2 +-
 .../test/performance/thrift/NullTserver.java    |   2 +-
 21 files changed, 250 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/3bd0caa5/core/src/main/java/org/apache/accumulo/core/conf/Property.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java 
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index efa7eb5..953ca8e 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -156,6 +156,8 @@ public enum Property {
   GENERAL_KERBEROS_PRINCIPAL("general.kerberos.principal", "", 
PropertyType.STRING, "Name of the kerberos principal to use. _HOST will 
automatically be "
       + "replaced by the machines hostname in the hostname portion of the 
principal. Leave blank if not using kerberoized hdfs"),
   GENERAL_MAX_MESSAGE_SIZE("general.server.message.size.max", "1G", 
PropertyType.MEMORY, "The maximum size of a message that can be sent to a 
server."),
+  
GENERAL_SIMPLETIMER_THREADPOOL_SIZE("general.server.simpletimer.threadpool.size",
 "1", PropertyType.COUNT, "The number of threads to use for "
+      + "server-internal scheduled tasks"),
   @Experimental
   GENERAL_VOLUME_CHOOSER("general.volume.chooser", 
"org.apache.accumulo.server.fs.RandomVolumeChooser", PropertyType.CLASSNAME,
       "The class that will be used to select which volume will be used to 
create new files."),

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3bd0caa5/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloClusterImpl.java
----------------------------------------------------------------------
diff --git 
a/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloClusterImpl.java
 
b/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloClusterImpl.java
index 35a7b9d..2252b1a 100644
--- 
a/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloClusterImpl.java
+++ 
b/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloClusterImpl.java
@@ -93,7 +93,7 @@ public class MiniAccumuloClusterImpl {
       this.in = new BufferedReader(new InputStreamReader(stream));
       out = new BufferedWriter(new FileWriter(logFile));
 
-      SimpleTimer.getInstance().schedule(new Runnable() {
+      SimpleTimer.getInstance(null).schedule(new Runnable() {
         @Override
         public void run() {
           try {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3bd0caa5/server/base/src/main/java/org/apache/accumulo/server/Accumulo.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/Accumulo.java 
b/server/base/src/main/java/org/apache/accumulo/server/Accumulo.java
index 2fa9051..21e9955 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/Accumulo.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/Accumulo.java
@@ -26,6 +26,7 @@ import java.util.Map.Entry;
 import java.util.TreeMap;
 
 import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.trace.DistributedTrace;
 import org.apache.accumulo.core.util.AddressUtil;
@@ -140,14 +141,14 @@ public class Accumulo {
       log.info(key + " = " + (Property.isSensitive(key) ? "<hidden>" : 
entry.getValue()));
     }
     
-    monitorSwappiness();
+    monitorSwappiness(config.getConfiguration());
   }
   
   /**
    * 
    */
-  public static void monitorSwappiness() {
-    SimpleTimer.getInstance().schedule(new Runnable() {
+  public static void monitorSwappiness(AccumuloConfiguration config) {
+    SimpleTimer.getInstance(config).schedule(new Runnable() {
       @Override
       public void run() {
         try {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3bd0caa5/server/base/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java
----------------------------------------------------------------------
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java
 
b/server/base/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java
index 63bd894..f0fea77 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java
@@ -226,7 +226,7 @@ public class LiveTServerSet implements Watcher {
 
   public synchronized void startListeningForTabletServerChanges() {
     scanServers();
-    SimpleTimer.getInstance().schedule(new Runnable() {
+    SimpleTimer.getInstance(conf).schedule(new Runnable() {
       @Override
       public void run() {
         scanServers();

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3bd0caa5/server/base/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
----------------------------------------------------------------------
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/util/TServerUtils.java 
b/server/base/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
index 6d9e4c7..dc243c2 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
@@ -119,7 +119,8 @@ public class TServerUtils {
           port = 1024 + port % (65535 - 1024);
         try {
           HostAndPort addr = HostAndPort.fromParts(address, port);
-          return TServerUtils.startTServer(addr, timedProcessor, serverName, 
threadName, minThreads, timeBetweenThreadChecks, maxMessageSize,
+          return TServerUtils.startTServer(addr, timedProcessor, serverName, 
threadName, minThreads,
+              conf.getCount(Property.GENERAL_SIMPLETIMER_THREADPOOL_SIZE), 
timeBetweenThreadChecks, maxMessageSize,
               SslConnectionParams.forServer(conf), 
conf.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT));
         } catch (TTransportException ex) {
           log.error("Unable to start TServer", ex);
@@ -234,7 +235,7 @@ public class TServerUtils {
   }
 
   public static ServerAddress createHsHaServer(HostAndPort address, TProcessor 
processor, final String serverName, String threadName, final int numThreads,
-      long timeBetweenThreadChecks, long maxMessageSize) throws 
TTransportException {
+    final int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize) 
throws TTransportException {
     TNonblockingServerSocket transport = new TNonblockingServerSocket(new 
InetSocketAddress(address.getHostText(), address.getPort()));
     THsHaServer.Args options = new THsHaServer.Args(transport);
     options.protocolFactory(ThriftUtil.protocolFactory());
@@ -246,7 +247,7 @@ public class TServerUtils {
      */
     final ThreadPoolExecutor pool = new SimpleThreadPool(numThreads, 
"ClientPool");
     // periodically adjust the number of threads we need by checking how busy 
our threads are
-    SimpleTimer.getInstance().schedule(new Runnable() {
+    SimpleTimer.getInstance(numSTThreads).schedule(new Runnable() {
       @Override
       public void run() {
         if (pool.getCorePoolSize() <= pool.getActiveCount()) {
@@ -316,20 +317,20 @@ public class TServerUtils {
     return new ServerAddress(createThreadPoolServer(transport, processor), 
address);
   }
 
-  public static ServerAddress startTServer(HostAndPort address, TProcessor 
processor, String serverName, String threadName, int numThreads,
+  public static ServerAddress startTServer(HostAndPort address, TProcessor 
processor, String serverName, String threadName, int numThreads, int 
numSTThreads,
       long timeBetweenThreadChecks, long maxMessageSize, SslConnectionParams 
sslParams, long sslSocketTimeout) throws TTransportException {
-    return startTServer(address, new TimedProcessor(processor, serverName, 
threadName), serverName, threadName, numThreads, timeBetweenThreadChecks,
-        maxMessageSize, sslParams, sslSocketTimeout);
+    return startTServer(address, new TimedProcessor(processor, serverName, 
threadName), serverName, threadName, numThreads, numSTThreads,
+        timeBetweenThreadChecks, maxMessageSize, sslParams, sslSocketTimeout);
   }
 
   public static ServerAddress startTServer(HostAndPort address, TimedProcessor 
processor, String serverName, String threadName, int numThreads,
-      long timeBetweenThreadChecks, long maxMessageSize, SslConnectionParams 
sslParams, long sslSocketTimeout) throws TTransportException {
+    int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize, 
SslConnectionParams sslParams, long sslSocketTimeout) throws 
TTransportException {
 
     ServerAddress serverAddress;
     if (sslParams != null) {
       serverAddress = createSslThreadPoolServer(address, processor, 
sslSocketTimeout, sslParams);
     } else {
-      serverAddress = createHsHaServer(address, processor, serverName, 
threadName, numThreads, timeBetweenThreadChecks, maxMessageSize);
+      serverAddress = createHsHaServer(address, processor, serverName, 
threadName, numThreads, numSTThreads, timeBetweenThreadChecks, maxMessageSize);
     }
     final TServer finalServer = serverAddress.server;
     Runnable serveTask = new Runnable() {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3bd0caa5/server/base/src/main/java/org/apache/accumulo/server/util/time/SimpleTimer.java
----------------------------------------------------------------------
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/util/time/SimpleTimer.java
 
b/server/base/src/main/java/org/apache/accumulo/server/util/time/SimpleTimer.java
index 499b0de..b8f7f6d 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/util/time/SimpleTimer.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/util/time/SimpleTimer.java
@@ -16,56 +16,120 @@
  */
 package org.apache.accumulo.server.util.time;
 
-import java.util.Timer;
-import java.util.TimerTask;
-
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
 import org.apache.log4j.Logger;
 
 /**
- * Generic singleton timer: don't use it if you are going to do anything that 
will take very long. Please use it to reduce the number of threads dedicated to
+ * Generic singleton timer. Don't use this if you are going to do anything 
that will take very long. Please use it to reduce the number of threads 
dedicated to
  * simple events.
  * 
  */
 public class SimpleTimer {
+  private static final Logger log = Logger.getLogger(SimpleTimer.class);
   
-  static class LoggingTimerTask extends TimerTask {
-    
-    private Runnable task;
-    
-    LoggingTimerTask(Runnable task) {
-      this.task = task;
-    }
-    
-    @Override
-    public void run() {
-      try {
-        task.run();
-      } catch (Throwable t) {
-        Logger.getLogger(LoggingTimerTask.class).warn("Timer task failed " + 
task.getClass().getName() + " " + t.getMessage(), t);
-      }
+  private static class ExceptionHandler implements 
Thread.UncaughtExceptionHandler {
+    public void uncaughtException(Thread t, Throwable e) {
+      log.warn("SimpleTimer task failed", e);
     }
-    
   }
 
+  private static int instanceThreadPoolSize = -1;
   private static SimpleTimer instance;
-  private Timer timer;
+  private ScheduledExecutorService executor;
   
+  private static final int DEFAULT_THREAD_POOL_SIZE = 1;
+  /**
+   * Gets the timer instance.
+   *
+   * @deprecated Use {@link #getInstance(AccumuloConfiguration)} instead to
+   * get the configured number of threads.
+   */
+  @Deprecated
   public static synchronized SimpleTimer getInstance() {
-    if (instance == null)
-      instance = new SimpleTimer();
+    return getInstance(null);
+  }
+  /**
+   * Gets the timer instance. If an instance has already been created, it will
+   * have the number of threads supplied when it was constructed, and the size
+   * provided here is ignored.
+   *
+   * @param threadPoolSize number of threads
+   */
+  public static synchronized SimpleTimer getInstance(int threadPoolSize) {
+    if (instance == null) {
+      instance = new SimpleTimer(threadPoolSize);
+      instance.instanceThreadPoolSize = threadPoolSize;
+    } else {
+      if (instance.instanceThreadPoolSize != threadPoolSize) {
+        log.warn("Asked to create SimpleTimer with thread pool size " +
+                 threadPoolSize + ", existing instance has " +
+                 instanceThreadPoolSize);
+      }
+    }
     return instance;
   }
-  
-  private SimpleTimer() {
-    timer = new Timer("SimpleTimer", true);
+  /**
+   * Gets the timer instance. If an instance has already been created, it will
+   * have the number of threads supplied when it was constructed, and the size
+   * provided by the configuration here is ignored. If a null configuration is
+   * supplied, the number of threads defaults to 1.
+   *
+   * @param conf configuration from which to get the number of threads
+   * @see Property#GENERAL_SIMPLETIMER_THREADPOOL_SIZE
+   */
+  public static synchronized SimpleTimer getInstance(AccumuloConfiguration 
conf) {
+    int threadPoolSize;
+    if (conf != null) {
+      threadPoolSize = 
conf.getCount(Property.GENERAL_SIMPLETIMER_THREADPOOL_SIZE);
+    } else {
+      threadPoolSize = DEFAULT_THREAD_POOL_SIZE;
+    }
+    return getInstance(threadPoolSize);
+  }
+
+  /**
+   * Gets the thread pool size for the timer instance. Use for testing only.
+   *
+   * @return thread pool size for timer instance, or -1 if not yet constructed
+   */
+  @VisibleForTesting
+  static int getInstanceThreadPoolSize() {
+    return instanceThreadPoolSize;
   }
   
-  public void schedule(Runnable task, long delay) {
-    timer.schedule(new LoggingTimerTask(task), delay);
+  private SimpleTimer(int threadPoolSize) {
+    executor = Executors.newScheduledThreadPool(threadPoolSize, new 
ThreadFactoryBuilder().setNameFormat("SimpleTimer-%d").setDaemon(true)
+      .setUncaughtExceptionHandler(new ExceptionHandler()).build());
   }
   
-  public void schedule(Runnable task, long delay, long period) {
-    timer.schedule(new LoggingTimerTask(task), delay, period);
+  /**
+   * Schedules a task to run in the future.
+   *
+   * @param task task to run
+   * @param delay number of milliseconds to wait before execution
+   * @return future for scheduled task
+   */
+  public ScheduledFuture<?> schedule(Runnable task, long delay) {
+    return executor.schedule(task, delay, TimeUnit.MILLISECONDS);
   }
   
+  /**
+   * Schedules a task to run in the future with a fixed delay between repeated
+   * executions.
+   *
+   * @param task task to run
+   * @param delay number of milliseconds to wait before first execution
+   * @param period number of milliseconds to wait between executions
+   * @return future for scheduled task
+   */
+  public ScheduledFuture<?> schedule(Runnable task, long delay, long period) {
+    return executor.scheduleWithFixedDelay(task, delay, period, 
TimeUnit.MILLISECONDS);
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3bd0caa5/server/base/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java
----------------------------------------------------------------------
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java
 
b/server/base/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java
index c5a9528..4738c2b 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java
@@ -24,6 +24,7 @@ import java.util.Set;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
 import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
 import org.apache.accumulo.server.util.time.SimpleTimer;
@@ -47,6 +48,7 @@ public class DistributedWorkQueue {
   private ThreadPoolExecutor threadPool;
   private ZooReaderWriter zoo = ZooReaderWriter.getInstance();
   private String path;
+  private AccumuloConfiguration config;
 
   private AtomicInteger numTask = new AtomicInteger(0);
 
@@ -147,8 +149,9 @@ public class DistributedWorkQueue {
     void process(String workID, byte[] data);
   }
   
-  public DistributedWorkQueue(String path) {
+  public DistributedWorkQueue(String path, AccumuloConfiguration config) {
     this.path = path;
+    this.config = config;
   }
   
   public void startProcessing(final Processor processor, ThreadPoolExecutor 
executorService) throws KeeperException, InterruptedException {
@@ -189,7 +192,7 @@ public class DistributedWorkQueue {
     
     Random r = new Random();
     // Add a little jitter to avoid all the tservers slamming zookeeper at once
-    SimpleTimer.getInstance().schedule(new Runnable() {
+    SimpleTimer.getInstance(config).schedule(new Runnable() {
       @Override
       public void run() {
         try {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3bd0caa5/server/base/src/test/java/org/apache/accumulo/server/util/time/SimpleTimerTest.java
----------------------------------------------------------------------
diff --git 
a/server/base/src/test/java/org/apache/accumulo/server/util/time/SimpleTimerTest.java
 
b/server/base/src/test/java/org/apache/accumulo/server/util/time/SimpleTimerTest.java
new file mode 100644
index 0000000..0a59812
--- /dev/null
+++ 
b/server/base/src/test/java/org/apache/accumulo/server/util/time/SimpleTimerTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.util.time;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class SimpleTimerTest {
+  private static final long DELAY = 1000L;
+  private static final long PERIOD = 2000L;
+  private static final long PAD = 100L;
+  private SimpleTimer t;
+
+  @Before
+  public void setUp() {
+    t = SimpleTimer.getInstance(null);
+  }
+
+  private static class Incrementer implements Runnable {
+    private final AtomicInteger i;
+    private volatile boolean canceled;
+
+    Incrementer(AtomicInteger i) {
+      this.i = i;
+      canceled = false;
+    }
+
+    public void run() {
+      if (canceled) {
+        return;
+      }
+      i.incrementAndGet();
+    }
+
+    public void cancel() {
+      canceled = true;
+    }
+  }
+
+  private static class Thrower implements Runnable {
+    boolean wasRun = false;
+    public void run() {
+      wasRun = true;
+      throw new IllegalStateException("You shall not pass");
+    }
+  }
+
+  @Test
+  public void testOneTimeSchedule() throws InterruptedException {
+    AtomicInteger i = new AtomicInteger();
+    Incrementer r = new Incrementer(i);
+    t.schedule(r, DELAY);
+    Thread.sleep(DELAY + PAD);
+    assertEquals(1, i.get());
+    r.cancel();
+  }
+
+  @Test
+  public void testFixedDelaySchedule() throws InterruptedException {
+    AtomicInteger i = new AtomicInteger();
+    Incrementer r = new Incrementer(i);
+    t.schedule(r, DELAY, PERIOD);
+    Thread.sleep(DELAY + (2 * PERIOD) + PAD);
+    assertEquals(3, i.get());
+    r.cancel();
+  }
+
+  @Test
+  public void testFailure() throws InterruptedException {
+    Thrower r = new Thrower();
+    t.schedule(r, DELAY);
+    Thread.sleep(DELAY + PAD);
+    assertTrue(r.wasRun);
+  }
+
+  @Test
+  public void testSingleton() {
+    assertEquals(1, SimpleTimer.getInstanceThreadPoolSize());
+    SimpleTimer t2 = SimpleTimer.getInstance(2);
+    assertSame(t, t2);
+    assertEquals(1, SimpleTimer.getInstanceThreadPoolSize());  // unchanged
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3bd0caa5/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
----------------------------------------------------------------------
diff --git 
a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java 
b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
index 89925b4..95b12fe 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
@@ -555,8 +555,8 @@ public class SimpleGarbageCollector implements Iface {
     HostAndPort result = HostAndPort.fromParts(opts.getAddress(), port);
     log.debug("Starting garbage collector listening on " + result);
     try {
-      return TServerUtils.startTServer(result, processor, 
this.getClass().getSimpleName(), "GC Monitor Service", 2, 1000, maxMessageSize,
-          SslConnectionParams.forServer(conf), 0).address;
+      return TServerUtils.startTServer(result, processor, 
this.getClass().getSimpleName(), "GC Monitor Service", 2,
+          conf.getCount(Property.GENERAL_SIMPLETIMER_THREADPOOL_SIZE), 1000, 
maxMessageSize, SslConnectionParams.forServer(conf), 0).address;
     } catch (Exception ex) {
       log.fatal(ex, ex);
       throw new RuntimeException(ex);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3bd0caa5/server/master/src/main/java/org/apache/accumulo/master/Master.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/Master.java 
b/server/master/src/main/java/org/apache/accumulo/master/Master.java
index 8e98c04..4b4141b 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/Master.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java
@@ -207,7 +207,7 @@ public class Master implements LiveTServerSet.Listener, 
TableObserver, CurrentSt
     if (newState == MasterState.STOP) {
       // Give the server a little time before shutdown so the client
       // thread requesting the stop can return
-      SimpleTimer.getInstance().schedule(new Runnable() {
+      SimpleTimer.getInstance(serverConfig.getConfiguration()).schedule(new 
Runnable() {
         @Override
         public void run() {
           // This frees the main thread and will cause the master to exit
@@ -901,7 +901,7 @@ public class Master implements LiveTServerSet.Listener, 
TableObserver, CurrentSt
       fate = new Fate<Master>(this, store);
       fate.startTransactionRunners(threads);
 
-      SimpleTimer.getInstance().schedule(new Runnable() {
+      SimpleTimer.getInstance(serverConfig.getConfiguration()).schedule(new 
Runnable() {
 
         @Override
         public void run() {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3bd0caa5/server/master/src/main/java/org/apache/accumulo/master/recovery/RecoveryManager.java
----------------------------------------------------------------------
diff --git 
a/server/master/src/main/java/org/apache/accumulo/master/recovery/RecoveryManager.java
 
b/server/master/src/main/java/org/apache/accumulo/master/recovery/RecoveryManager.java
index 76d3520..a9adf8f 100644
--- 
a/server/master/src/main/java/org/apache/accumulo/master/recovery/RecoveryManager.java
+++ 
b/server/master/src/main/java/org/apache/accumulo/master/recovery/RecoveryManager.java
@@ -63,7 +63,8 @@ public class RecoveryManager {
     executor = Executors.newScheduledThreadPool(4, new 
NamingThreadFactory("Walog sort starter "));
     zooCache = new ZooCache();
     try {
-      List<String> workIDs = new 
DistributedWorkQueue(ZooUtil.getRoot(master.getInstance()) + 
Constants.ZRECOVERY).getWorkQueued();
+      AccumuloConfiguration aconf = 
master.getConfiguration().getConfiguration();
+      List<String> workIDs = new 
DistributedWorkQueue(ZooUtil.getRoot(master.getInstance()) + 
Constants.ZRECOVERY, aconf).getWorkQueued();
       sortsQueued.addAll(workIDs);
     } catch (Exception e) {
       log.warn(e, e);
@@ -87,14 +88,14 @@ public class RecoveryManager {
     public void run() {
       boolean rescheduled = false;
       try {
-
-        long time = closer.close(master.getConfiguration().getConfiguration(), 
master.getFileSystem(), new Path(source));
+        AccumuloConfiguration aconf = 
master.getConfiguration().getConfiguration();
+        long time = closer.close(aconf, master.getFileSystem(), new 
Path(source));
 
         if (time > 0) {
           executor.schedule(this, time, TimeUnit.MILLISECONDS);
           rescheduled = true;
         } else {
-          initiateSort(sortId, source, destination);
+          initiateSort(sortId, source, destination, aconf);
         }
       } catch (FileNotFoundException e) {
         log.debug("Unable to initate log sort for " + source + ": " + e);
@@ -111,9 +112,10 @@ public class RecoveryManager {
 
   }
 
-  private void initiateSort(String sortId, String source, final String 
destination) throws KeeperException, InterruptedException, IOException {
+  private void initiateSort(String sortId, String source, final String 
destination, AccumuloConfiguration aconf)
+    throws KeeperException, InterruptedException, IOException {
     String work = source + "|" + destination;
-    new DistributedWorkQueue(ZooUtil.getRoot(master.getInstance()) + 
Constants.ZRECOVERY).addWork(sortId, work.getBytes(Constants.UTF8));
+    new DistributedWorkQueue(ZooUtil.getRoot(master.getInstance()) + 
Constants.ZRECOVERY, aconf).addWork(sortId, work.getBytes(Constants.UTF8));
 
     synchronized (this) {
       sortsQueued.add(sortId);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3bd0caa5/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
----------------------------------------------------------------------
diff --git 
a/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
 
b/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
index bdc89dd..82abf22 100644
--- 
a/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
+++ 
b/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
@@ -432,7 +432,7 @@ class CopyFailed extends MasterRepo {
     
     if (loadedFailures.size() > 0) {
       DistributedWorkQueue bifCopyQueue = new 
DistributedWorkQueue(Constants.ZROOT + "/" + 
HdfsZooInstance.getInstance().getInstanceID()
-          + Constants.ZBULK_FAILED_COPYQ);
+          + Constants.ZBULK_FAILED_COPYQ, 
master.getConfiguration().getConfiguration());
       
       HashSet<String> workIds = new HashSet<String>();
       

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3bd0caa5/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/BasicServlet.java
----------------------------------------------------------------------
diff --git 
a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/BasicServlet.java
 
b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/BasicServlet.java
index bf65dae..3cbc0d3 100644
--- 
a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/BasicServlet.java
+++ 
b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/BasicServlet.java
@@ -105,7 +105,7 @@ abstract public class BasicServlet extends HttpServlet {
     synchronized (BasicServlet.class) {
       // Learn our instance name asynchronously so we don't hang up if 
zookeeper is down
       if (cachedInstanceName == null) {
-        SimpleTimer.getInstance().schedule(new TimerTask() {
+        SimpleTimer.getInstance(Monitor.getSystemConfiguration()).schedule(new 
TimerTask() {
           @Override
           public void run() {
             synchronized (BasicServlet.class) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3bd0caa5/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
----------------------------------------------------------------------
diff --git 
a/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java 
b/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
index 30f1ae7..7aab5cd 100644
--- a/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
+++ b/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
@@ -227,7 +227,7 @@ public class TraceServer implements Watcher {
   }
   
   public void run() throws Exception {
-    SimpleTimer.getInstance().schedule(new Runnable() {
+    
SimpleTimer.getInstance(serverConfiguration.getConfiguration()).schedule(new 
Runnable() {
       @Override
       public void run() {
         flush();

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3bd0caa5/server/tserver/src/main/java/org/apache/accumulo/tserver/CompactionWatcher.java
----------------------------------------------------------------------
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/CompactionWatcher.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/CompactionWatcher.java
index e6ca38f..2e4d7b7 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/CompactionWatcher.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/CompactionWatcher.java
@@ -102,7 +102,7 @@ public class CompactionWatcher implements Runnable {
 
   public static synchronized void startWatching(AccumuloConfiguration config) {
     if (!watching) {
-      SimpleTimer.getInstance().schedule(new CompactionWatcher(config), 10000, 
10000);
+      SimpleTimer.getInstance(config).schedule(new CompactionWatcher(config), 
10000, 10000);
       watching = true;
     }
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3bd0caa5/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java
----------------------------------------------------------------------
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java
index bb95532..6b553a1 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java
@@ -177,7 +177,7 @@ public class FileManager {
     this.reservedReaders = new HashMap<FileSKVIterator,String>();
     
     this.maxIdleTime = 
conf.getConfiguration().getTimeInMillis(Property.TSERV_MAX_IDLE);
-    SimpleTimer.getInstance().schedule(new IdleFileCloser(), maxIdleTime, 
maxIdleTime / 2);
+    SimpleTimer.getInstance(conf.getConfiguration()).schedule(new 
IdleFileCloser(), maxIdleTime, maxIdleTime / 2);
     
   }
   

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3bd0caa5/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 3d58a99..a718748 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -261,8 +261,9 @@ public class TabletServer extends AbstractMetricsImpl 
implements org.apache.accu
     this.serverConfig = conf;
     this.instance = conf.getInstance();
     this.fs = fs;
-    this.logSorter = new LogSorter(instance, fs, getSystemConfiguration());
-    SimpleTimer.getInstance().schedule(new Runnable() {
+    AccumuloConfiguration aconf = getSystemConfiguration();
+    this.logSorter = new LogSorter(instance, fs, aconf);
+    SimpleTimer.getInstance(aconf).schedule(new Runnable() {
       @Override
       public void run() {
         synchronized (onlineTablets) {
@@ -358,6 +359,7 @@ public class TabletServer extends AbstractMetricsImpl 
implements org.apache.accu
     SecureRandom random;
     Map<Long,Session> sessions;
     long maxIdle;
+    AccumuloConfiguration aconf;
 
     SessionManager(AccumuloConfiguration conf) {
       random = new SecureRandom();
@@ -372,7 +374,8 @@ public class TabletServer extends AbstractMetricsImpl 
implements org.apache.accu
         }
       };
 
-      SimpleTimer.getInstance().schedule(r, 0, Math.max(maxIdle / 2, 1000));
+      SimpleTimer.getInstance(conf).schedule(r, 0, Math.max(maxIdle / 2, 
1000));
+      aconf = conf;
     }
 
     synchronized long createSession(Session session, boolean reserve) {
@@ -515,7 +518,7 @@ public class TabletServer extends AbstractMetricsImpl 
implements org.apache.accu
           }
         };
 
-        SimpleTimer.getInstance().schedule(r, delay);
+        SimpleTimer.getInstance(aconf).schedule(r, delay);
       }
     }
 
@@ -2958,7 +2961,7 @@ public class TabletServer extends AbstractMetricsImpl 
implements org.apache.accu
         enqueueMasterMessage(new 
TabletStatusMessage(TabletLoadState.LOAD_FAILURE, extent));
         long reschedule = Math.min((1l << Math.min(32, retryAttempt)) * 1000, 
10 * 60 * 1000l);
         log.warn(String.format("rescheduling tablet load in %.2f seconds", 
reschedule / 1000.));
-        SimpleTimer.getInstance().schedule(new TimerTask() {
+        SimpleTimer.getInstance(getSystemConfiguration()).schedule(new 
TimerTask() {
           @Override
           public void run() {
             log.info("adding tablet " + extent + " back to the assignment pool 
(retry " + retryAttempt + ")");
@@ -3167,7 +3170,7 @@ public class TabletServer extends AbstractMetricsImpl 
implements org.apache.accu
 
     ThreadPoolExecutor distWorkQThreadPool = new 
SimpleThreadPool(getSystemConfiguration().getCount(Property.TSERV_WORKQ_THREADS),
 "distributed work queue");
 
-    bulkFailedCopyQ = new DistributedWorkQueue(ZooUtil.getRoot(instance) + 
Constants.ZBULK_FAILED_COPYQ);
+    bulkFailedCopyQ = new DistributedWorkQueue(ZooUtil.getRoot(instance) + 
Constants.ZBULK_FAILED_COPYQ, getSystemConfiguration());
     try {
       bulkFailedCopyQ.startProcessing(new BulkFailedCopyProcessor(), 
distWorkQThreadPool);
     } catch (Exception e1) {
@@ -3516,9 +3519,10 @@ public class TabletServer extends AbstractMetricsImpl 
implements org.apache.accu
       }
     };
 
-    SimpleTimer.getInstance().schedule(contextCleaner, 60000, 60000);
+    AccumuloConfiguration aconf = getSystemConfiguration();
+    SimpleTimer.getInstance(aconf).schedule(contextCleaner, 60000, 60000);
 
-    FileSystemMonitor.start(getSystemConfiguration(), 
Property.TSERV_MONITOR_FS);
+    FileSystemMonitor.start(aconf, Property.TSERV_MONITOR_FS);
 
     Runnable gcDebugTask = new Runnable() {
       @Override
@@ -3527,7 +3531,7 @@ public class TabletServer extends AbstractMetricsImpl 
implements org.apache.accu
       }
     };
 
-    SimpleTimer.getInstance().schedule(gcDebugTask, 0, 1000);
+    SimpleTimer.getInstance(aconf).schedule(gcDebugTask, 0, 1000);
 
     Runnable constraintTask = new Runnable() {
 
@@ -3545,7 +3549,7 @@ public class TabletServer extends AbstractMetricsImpl 
implements org.apache.accu
       }
     };
 
-    SimpleTimer.getInstance().schedule(constraintTask, 0, 1000);
+    SimpleTimer.getInstance(aconf).schedule(constraintTask, 0, 1000);
 
     this.resourceManager = new TabletServerResourceManager(instance, fs);
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3bd0caa5/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
----------------------------------------------------------------------
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
index 4eae109..f26c74b 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
@@ -108,7 +108,7 @@ public class TabletServerResourceManager {
 
   private ExecutorService addEs(final Property maxThreads, String name, final 
ThreadPoolExecutor tp) {
     ExecutorService result = addEs(name, tp);
-    SimpleTimer.getInstance().schedule(new Runnable() {
+    SimpleTimer.getInstance(conf.getConfiguration()).schedule(new Runnable() {
       @Override
       public void run() {
         try {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3bd0caa5/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
----------------------------------------------------------------------
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
index 8f783c3..1e3f895 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
@@ -213,7 +213,7 @@ public class LogSorter {
 
   public void startWatchingForRecoveryLogs(ThreadPoolExecutor 
distWorkQThreadPool) throws KeeperException, InterruptedException {
     this.threadPool = distWorkQThreadPool;
-    new DistributedWorkQueue(ZooUtil.getRoot(instance) + 
Constants.ZRECOVERY).startProcessing(new LogProcessor(), this.threadPool);
+    new DistributedWorkQueue(ZooUtil.getRoot(instance) + Constants.ZRECOVERY, 
conf).startProcessing(new LogProcessor(), this.threadPool);
   }
 
   public List<RecoveryStatus> getLogSorts() {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3bd0caa5/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java
----------------------------------------------------------------------
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java 
b/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java
index f26c8d7..9b3eec7 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java
@@ -98,7 +98,7 @@ public class ZombieTServer {
     TransactionWatcher watcher = new TransactionWatcher();
     final ThriftClientHandler tch = new ThriftClientHandler(instance, watcher);
     Processor<Iface> processor = new Processor<Iface>(tch);
-    ServerAddress serverPort = 
TServerUtils.startTServer(HostAndPort.fromParts("0.0.0.0", port), processor, 
"ZombieTServer", "walking dead", 2, 1000,
+    ServerAddress serverPort = 
TServerUtils.startTServer(HostAndPort.fromParts("0.0.0.0", port), processor, 
"ZombieTServer", "walking dead", 2, 1, 1000,
         10 * 1024 * 1024, null, -1);
     
     String addressString = serverPort.address.toString();

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3bd0caa5/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
----------------------------------------------------------------------
diff --git 
a/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
 
b/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
index 0591b19..6c34172 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
@@ -243,7 +243,7 @@ public class NullTserver {
     TransactionWatcher watcher = new TransactionWatcher();
     ThriftClientHandler tch = new 
ThriftClientHandler(HdfsZooInstance.getInstance(), watcher);
     Processor<Iface> processor = new Processor<Iface>(tch);
-    TServerUtils.startTServer(HostAndPort.fromParts("0.0.0.0", opts.port), 
processor, "NullTServer", "null tserver", 2, 1000, 10 * 1024 * 1024, null, -1);
+    TServerUtils.startTServer(HostAndPort.fromParts("0.0.0.0", opts.port), 
processor, "NullTServer", "null tserver", 2, 1, 1000, 10 * 1024 * 1024, null, 
-1);
     
     HostAndPort addr = 
HostAndPort.fromParts(InetAddress.getLocalHost().getHostName(), opts.port);
     

Reply via email to