Repository: accumulo
Updated Branches:
  refs/heads/1.6 520d802ec -> 1f3288b20
  refs/heads/master 7e0121d11 -> 6236c4d47


ACCUMULO-3304 Track assignment execution duration and warn when they take 
longer than some duration.

Introduce a new configuration value, defaults to 10min, which controls
the duration an assignment must be running for a warning to be
printed. The period of checking threads is always half of the value.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/995080c4
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/995080c4
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/995080c4

Branch: refs/heads/1.6
Commit: 995080c44c2bf296eb58764cab6f536ce5a808a1
Parents: 520d802
Author: Josh Elser <els...@apache.org>
Authored: Sat Nov 8 12:20:19 2014 -0500
Committer: Josh Elser <els...@apache.org>
Committed: Sat Nov 8 12:20:19 2014 -0500

----------------------------------------------------------------------
 .../org/apache/accumulo/core/conf/Property.java |  2 +
 server/tserver/pom.xml                          |  9 +--
 .../tserver/ActiveAssignmentRunnable.java       | 78 ++++++++++++++++++++
 .../accumulo/tserver/RunnableStartedAt.java     | 51 +++++++++++++
 .../apache/accumulo/tserver/TabletServer.java   | 13 ++--
 .../tserver/TabletServerResourceManager.java    | 75 +++++++++++++++++--
 6 files changed, 210 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/995080c4/core/src/main/java/org/apache/accumulo/core/conf/Property.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java 
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index a03b210..aec7af5 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -278,6 +278,8 @@ public enum Property {
       "resiliency in the face of unexpected power outages, at the cost of 
speed. If method is not available, the legacy 'sync' method " +
       "will be used to ensure backwards compatibility with older Hadoop 
versions. A value of 'hflush' is the alternative to the default value " +
       "of 'hsync' which will result in faster writes, but with less 
durability"),
+  TSERV_ASSIGNMENT_DURATION_WARNING("tserver.assignment.duration.warning", 
"10m", PropertyType.TIMEDURATION, "The amount of time an assignment can run "
+      + " before the server will print a warning along with the current stack 
trace. Meant to help debug stuck assignments"),
 
   // properties that are specific to logger server behavior
   LOGGER_PREFIX("logger.", null, PropertyType.PREFIX, "Properties in this 
category affect the behavior of the write-ahead logger servers"),

http://git-wip-us.apache.org/repos/asf/accumulo/blob/995080c4/server/tserver/pom.xml
----------------------------------------------------------------------
diff --git a/server/tserver/pom.xml b/server/tserver/pom.xml
index 3ea50ad..1e08504 100644
--- a/server/tserver/pom.xml
+++ b/server/tserver/pom.xml
@@ -88,6 +88,10 @@
       <artifactId>zookeeper</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>
@@ -99,11 +103,6 @@
     </dependency>
     <dependency>
       <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-api</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.slf4j</groupId>
       <artifactId>slf4j-log4j12</artifactId>
       <scope>test</scope>
     </dependency>

http://git-wip-us.apache.org/repos/asf/accumulo/blob/995080c4/server/tserver/src/main/java/org/apache/accumulo/tserver/ActiveAssignmentRunnable.java
----------------------------------------------------------------------
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/ActiveAssignmentRunnable.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/ActiveAssignmentRunnable.java
new file mode 100644
index 0000000..dcbdae7
--- /dev/null
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/ActiveAssignmentRunnable.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+import jline.internal.Preconditions;
+
+import org.apache.accumulo.core.data.KeyExtent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public class ActiveAssignmentRunnable implements Runnable {
+  private static final Logger log = 
LoggerFactory.getLogger(ActiveAssignmentRunnable.class);
+
+  private final ConcurrentHashMap<KeyExtent,RunnableStartedAt> 
activeAssignments;
+  private final KeyExtent extent;
+  private final Runnable delegate;
+
+  // Make sure that the other thread calling getException will see the 
assignment by the thread calling run()
+  private volatile Thread executingThread;
+
+  public 
ActiveAssignmentRunnable(ConcurrentHashMap<KeyExtent,RunnableStartedAt> 
activeAssignments, KeyExtent extent, Runnable delegate) {
+    Preconditions.checkNotNull(activeAssignments);
+    Preconditions.checkNotNull(extent);
+    Preconditions.checkNotNull(delegate);
+    this.activeAssignments = activeAssignments;
+    this.extent = extent;
+    this.delegate = delegate;
+  }
+
+  @Override
+  public void run() {
+    if (activeAssignments.containsKey(extent)) {
+      throw new IllegalStateException("Active assignment already exists for " 
+ extent);
+    }
+
+    executingThread = Thread.currentThread();
+
+    try {
+      RunnableStartedAt runnableWithStartTime = new RunnableStartedAt(this, 
System.currentTimeMillis());
+      log.trace("Started assignment for {} at {}", extent, 
runnableWithStartTime.getStartTime());
+      activeAssignments.put(extent, runnableWithStartTime);
+      delegate.run();
+    } finally {
+      if (log.isTraceEnabled()) {
+        // Avoid the call to currentTimeMillis if we'd just throw it away 
anyways
+        log.trace("Finished assignment for {} at {}", extent, 
System.currentTimeMillis());
+      }
+      activeAssignments.remove(extent);
+    }
+  }
+
+  public Exception getException() {
+    final Exception e = new Exception("Assignment of " + extent);
+    if (null != executingThread) {
+      e.setStackTrace(executingThread.getStackTrace());
+    }
+    return e;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/995080c4/server/tserver/src/main/java/org/apache/accumulo/tserver/RunnableStartedAt.java
----------------------------------------------------------------------
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/RunnableStartedAt.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/RunnableStartedAt.java
new file mode 100644
index 0000000..6513091
--- /dev/null
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/RunnableStartedAt.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver;
+
+import java.util.AbstractMap;
+import java.util.Map.Entry;
+
+/**
+ * Encapsulation of a task and the time it began execution.
+ */
+public class RunnableStartedAt extends 
AbstractMap.SimpleEntry<ActiveAssignmentRunnable,Long> {
+
+  private static final long serialVersionUID = 1L;
+
+  public RunnableStartedAt(ActiveAssignmentRunnable task, Long 
startedAtMillis) {
+    super(task, startedAtMillis);
+  }
+
+  public RunnableStartedAt(Entry<? extends ActiveAssignmentRunnable,? extends 
Long> entry) {
+    super(entry);
+  }
+
+  /**
+   * @return The task being executed
+   */
+  public ActiveAssignmentRunnable getTask() {
+    return getKey();
+  }
+
+  /**
+   * @return The time, in millis, that the runnable was submitted at
+   */
+  public Long getStartTime() {
+    return getValue();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/995080c4/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 94be0bb..8ef44da 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -2282,7 +2282,8 @@ public class TabletServer extends AbstractMetricsImpl 
implements org.apache.accu
       // add the assignment job to the appropriate queue
       log.info("Loading tablet " + extent);
 
-      final Runnable ah = new LoggingRunnable(log, new 
AssignmentHandler(extent));
+      final AssignmentHandler ah = new AssignmentHandler(extent);
+      // final Runnable ah = new LoggingRunnable(log, );
       // Root tablet assignment must take place immediately
       if (extent.isRootTablet()) {
         new Daemon("Root Tablet Assignment") {
@@ -2299,9 +2300,9 @@ public class TabletServer extends AbstractMetricsImpl 
implements org.apache.accu
         }.start();
       } else {
         if (extent.isMeta()) {
-          resourceManager.addMetaDataAssignment(ah);
+          resourceManager.addMetaDataAssignment(extent, log, ah);
         } else {
-          resourceManager.addAssignment(ah);
+          resourceManager.addAssignment(extent, log, ah);
         }
       }
     }
@@ -2824,7 +2825,7 @@ public class TabletServer extends AbstractMetricsImpl 
implements org.apache.accu
     }
   }
 
-  private class AssignmentHandler implements Runnable {
+  protected class AssignmentHandler implements Runnable {
     private KeyExtent extent;
     private int retryAttempt = 0;
 
@@ -2979,10 +2980,10 @@ public class TabletServer extends AbstractMetricsImpl 
implements org.apache.accu
               if (extent.isRootTablet()) {
                 new Daemon(new LoggingRunnable(log, handler), "Root tablet 
assignment retry").start();
               } else {
-                resourceManager.addMetaDataAssignment(handler);
+                resourceManager.addMetaDataAssignment(extent, log, handler);
               }
             } else {
-              resourceManager.addAssignment(handler);
+              resourceManager.addAssignment(extent, log, handler);
             }
           }
         }, reschedule);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/995080c4/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
----------------------------------------------------------------------
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
index 935ffeb..7c0eedc 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
@@ -26,6 +26,7 @@ import java.util.Map.Entry;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -54,6 +55,7 @@ import org.apache.accumulo.server.util.time.SimpleTimer;
 import org.apache.accumulo.trace.instrument.TraceExecutorService;
 import org.apache.accumulo.tserver.FileManager.ScanFileManager;
 import org.apache.accumulo.tserver.Tablet.MinorCompactionReason;
+import org.apache.accumulo.tserver.TabletServer.AssignmentHandler;
 import org.apache.accumulo.tserver.compaction.CompactionStrategy;
 import org.apache.accumulo.tserver.compaction.DefaultCompactionStrategy;
 import org.apache.accumulo.tserver.compaction.MajorCompactionReason;
@@ -62,9 +64,9 @@ import org.apache.log4j.Logger;
 
 /**
  * ResourceManager is responsible for managing the resources of all tablets 
within a tablet server.
- * 
- * 
- * 
+ *
+ *
+ *
  */
 public class TabletServerResourceManager {
 
@@ -82,6 +84,8 @@ public class TabletServerResourceManager {
   private ExecutorService defaultReadAheadThreadPool;
   private Map<String,ExecutorService> threadPools = new 
TreeMap<String,ExecutorService>();
 
+  private final ConcurrentHashMap<KeyExtent,RunnableStartedAt> 
activeAssignments;
+
   private HashSet<TabletResourceManager> tabletResources;
 
   private final VolumeManager fs;
@@ -196,6 +200,8 @@ public class TabletServerResourceManager {
 
     assignMetaDataPool = createEs(0, 1, 60, "metadata tablet assignment");
 
+    activeAssignments = new ConcurrentHashMap<KeyExtent,RunnableStartedAt>();
+
     readAheadThreadPool = createEs(Property.TSERV_READ_AHEAD_MAXCONCURRENT, 
"tablet read ahead");
     defaultReadAheadThreadPool = 
createEs(Property.TSERV_METADATA_READ_AHEAD_MAXCONCURRENT, "metadata tablets 
read ahead");
 
@@ -209,6 +215,61 @@ public class TabletServerResourceManager {
     memoryManager.init(conf);
     memMgmt = new MemoryManagementFramework();
     memMgmt.startThreads();
+
+    SimpleTimer timer = SimpleTimer.getInstance();
+
+    // We can use the same map for both metadata and normal assignments since 
the keyspace (extent)
+    // is guaranteed to be unique. Schedule the task once, the task will 
reschedule itself.
+    timer.schedule(new AssignmentWatcher(acuConf, activeAssignments, timer), 
5000);
+  }
+
+  /**
+   * Accepts some map which is tracking active assignment task(s) (running) 
and monitors them to ensure that the time the assignment(s) have been running 
don't
+   * exceed a threshold. If the time is exceeded a warning is printed and a 
stack trace is logged for the running assignment.
+   */
+  protected static class AssignmentWatcher implements Runnable {
+    private static final Logger log = 
Logger.getLogger(AssignmentWatcher.class);
+
+    private final Map<KeyExtent,RunnableStartedAt> activeAssignments;
+    private final AccumuloConfiguration conf;
+    private final SimpleTimer timer;
+
+    public AssignmentWatcher(AccumuloConfiguration conf, 
Map<KeyExtent,RunnableStartedAt> activeAssignments, SimpleTimer timer) {
+      this.conf = conf;
+      this.activeAssignments = activeAssignments;
+      this.timer = timer;
+    }
+
+    @Override
+    public void run() {
+      final long millisBeforeWarning = 
conf.getTimeInMillis(Property.TSERV_ASSIGNMENT_DURATION_WARNING);
+      try {
+        long now = System.currentTimeMillis();
+        KeyExtent extent;
+        RunnableStartedAt runnable;
+        for (Entry<KeyExtent,RunnableStartedAt> entry : 
activeAssignments.entrySet()) {
+          extent = entry.getKey();
+          runnable = entry.getValue();
+          final long duration = now - runnable.getStartTime();
+
+          // Print a warning if an assignment has been running for over the 
configured time length
+          if (duration > millisBeforeWarning) {
+            log.warn("Assignment for " + extent + " has been running for at 
least " + duration + "ms", runnable.getTask().getException());
+          } else if (log.isTraceEnabled()) {
+            log.trace("Assignment for " + extent + " only running for " + 
duration + "ms");
+          }
+        }
+      } catch (Exception e) {
+        log.warn("Caught exception checking active assignments", e);
+      } finally {
+        // Don't run more often than every 5s
+        long delay = Math.max((long) (millisBeforeWarning * 0.5), 5000l);
+        if (log.isTraceEnabled()) {
+          log.trace("Rescheduling assignment watcher to run in " + delay + 
"ms");
+        }
+        timer.schedule(this, delay);
+      }
+    }
   }
 
   private static class TabletStateImpl implements TabletState, Cloneable {
@@ -647,12 +708,12 @@ public class TabletServerResourceManager {
     }
   }
 
-  public void addAssignment(Runnable assignmentHandler) {
-    assignmentPool.execute(assignmentHandler);
+  public void addAssignment(KeyExtent extent, Logger log, AssignmentHandler 
assignmentHandler) {
+    assignmentPool.execute(new ActiveAssignmentRunnable(activeAssignments, 
extent, new LoggingRunnable(log, assignmentHandler)));
   }
 
-  public void addMetaDataAssignment(Runnable assignmentHandler) {
-    assignMetaDataPool.execute(assignmentHandler);
+  public void addMetaDataAssignment(KeyExtent extent, Logger log, 
AssignmentHandler assignmentHandler) {
+    assignMetaDataPool.execute(new ActiveAssignmentRunnable(activeAssignments, 
extent, new LoggingRunnable(log, assignmentHandler)));
   }
 
   public void addMigration(KeyExtent tablet, Runnable migrationHandler) {

Reply via email to