Merge branch '1.6'

Conflicts:
        core/src/main/java/org/apache/accumulo/core/conf/Property.java
        server/tserver/pom.xml
        
server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
        
server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/6236c4d4
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/6236c4d4
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/6236c4d4

Branch: refs/heads/master
Commit: 6236c4d4761f11517c0c6b808b1f95072b668e20
Parents: 7e0121d 1f3288b
Author: Josh Elser <els...@apache.org>
Authored: Sat Nov 8 16:51:03 2014 -0500
Committer: Josh Elser <els...@apache.org>
Committed: Sat Nov 8 16:51:03 2014 -0500

----------------------------------------------------------------------
 .../org/apache/accumulo/core/conf/Property.java |  2 +
 .../tserver/ActiveAssignmentRunnable.java       | 78 ++++++++++++++++++++
 .../accumulo/tserver/RunnableStartedAt.java     | 51 +++++++++++++
 .../apache/accumulo/tserver/TabletServer.java   | 13 ++--
 .../tserver/TabletServerResourceManager.java    | 75 +++++++++++++++++--
 .../accumulo/tserver/AssignmentWatcherTest.java | 65 ++++++++++++++++
 6 files changed, 271 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/6236c4d4/core/src/main/java/org/apache/accumulo/core/conf/Property.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 11c8136,aec7af5..f59b654
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@@ -287,12 -274,12 +287,14 @@@ public enum Property 
        "The number of threads for the distributed work queue. These threads 
are used for copying failed bulk files."),
    TSERV_WAL_SYNC("tserver.wal.sync", "true", PropertyType.BOOLEAN,
        "Use the SYNC_BLOCK create flag to sync WAL writes to disk. Prevents 
problems recovering from sudden system resets."),
 -  TSERV_WAL_SYNC_METHOD("tserver.wal.sync.method", "hsync", 
PropertyType.STRING, "The method to invoke when sync'ing WALs. HSync will 
provide " +
 -      "resiliency in the face of unexpected power outages, at the cost of 
speed. If method is not available, the legacy 'sync' method " +
 -      "will be used to ensure backwards compatibility with older Hadoop 
versions. A value of 'hflush' is the alternative to the default value " +
 -      "of 'hsync' which will result in faster writes, but with less 
durability"),
 +  @Deprecated
 +  TSERV_WAL_SYNC_METHOD("tserver.wal.sync.method", "hsync", 
PropertyType.STRING, "This property is deprecated. Use table.durability 
instead."),
+   TSERV_ASSIGNMENT_DURATION_WARNING("tserver.assignment.duration.warning", 
"10m", PropertyType.TIMEDURATION, "The amount of time an assignment can run "
+       + " before the server will print a warning along with the current stack 
trace. Meant to help debug stuck assignments"),
 +  TSERV_REPLICATION_REPLAYERS("tserver.replication.replayer.", null, 
PropertyType.PREFIX, "Allows configuration of implementation used to apply 
replicated data"),
 +  TSERV_REPLICATION_DEFAULT_HANDLER("tserver.replication.default.replayer", 
"org.apache.accumulo.tserver.replication.BatchWriterReplicationReplayer",
 +      PropertyType.CLASSNAME, "Default AccumuloReplicationReplayer 
implementation"),
 +  
TSERV_REPLICATION_BW_REPLAYER_MEMORY("tserver.replication.batchwriter.replayer.memory",
 "50M", PropertyType.MEMORY, "Memory to provide to batchwriter to replay 
mutations for replication"),
  
    // properties that are specific to logger server behavior
    LOGGER_PREFIX("logger.", null, PropertyType.PREFIX, "Properties in this 
category affect the behavior of the write-ahead logger servers"),

http://git-wip-us.apache.org/repos/asf/accumulo/blob/6236c4d4/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --cc 
server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 913feed,8ef44da..1e81947
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@@ -2019,12 -2825,12 +2020,12 @@@ public class TabletServer implements Ru
      }
    }
  
-   private class AssignmentHandler implements Runnable {
+   protected class AssignmentHandler implements Runnable {
 -    private KeyExtent extent;
 -    private int retryAttempt = 0;
 +    private final KeyExtent extent;
 +    private final int retryAttempt;
  
      public AssignmentHandler(KeyExtent extent) {
 -      this.extent = extent;
 +      this(extent, 0);
      }
  
      public AssignmentHandler(KeyExtent extent, int retryAttempt) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/6236c4d4/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
----------------------------------------------------------------------
diff --cc 
server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
index 7806bbc,7c0eedc..ba86522
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
@@@ -53,7 -52,10 +54,8 @@@ import org.apache.accumulo.server.table
  import org.apache.accumulo.server.tabletserver.MemoryManager;
  import org.apache.accumulo.server.tabletserver.TabletState;
  import org.apache.accumulo.server.util.time.SimpleTimer;
 -import org.apache.accumulo.trace.instrument.TraceExecutorService;
  import org.apache.accumulo.tserver.FileManager.ScanFileManager;
 -import org.apache.accumulo.tserver.Tablet.MinorCompactionReason;
+ import org.apache.accumulo.tserver.TabletServer.AssignmentHandler;
  import org.apache.accumulo.tserver.compaction.CompactionStrategy;
  import org.apache.accumulo.tserver.compaction.DefaultCompactionStrategy;
  import org.apache.accumulo.tserver.compaction.MajorCompactionReason;
@@@ -69,29 -70,31 +71,31 @@@ import org.apache.log4j.Logger
   */
  public class TabletServerResourceManager {
  
 -  private ExecutorService minorCompactionThreadPool;
 -  private ExecutorService majorCompactionThreadPool;
 -  private ExecutorService rootMajorCompactionThreadPool;
 -  private ExecutorService defaultMajorCompactionThreadPool;
 -  private ExecutorService splitThreadPool;
 -  private ExecutorService defaultSplitThreadPool;
 -  private ExecutorService defaultMigrationPool;
 -  private ExecutorService migrationPool;
 -  private ExecutorService assignmentPool;
 -  private ExecutorService assignMetaDataPool;
 -  private ExecutorService readAheadThreadPool;
 -  private ExecutorService defaultReadAheadThreadPool;
 -  private Map<String,ExecutorService> threadPools = new 
TreeMap<String,ExecutorService>();
 +  private static final Logger log = 
Logger.getLogger(TabletServerResourceManager.class);
  
 -  private final ConcurrentHashMap<KeyExtent,RunnableStartedAt> 
activeAssignments;
 +  private final ExecutorService minorCompactionThreadPool;
 +  private final ExecutorService majorCompactionThreadPool;
 +  private final ExecutorService rootMajorCompactionThreadPool;
 +  private final ExecutorService defaultMajorCompactionThreadPool;
 +  private final ExecutorService splitThreadPool;
 +  private final ExecutorService defaultSplitThreadPool;
 +  private final ExecutorService defaultMigrationPool;
 +  private final ExecutorService migrationPool;
 +  private final ExecutorService assignmentPool;
 +  private final ExecutorService assignMetaDataPool;
 +  private final ExecutorService readAheadThreadPool;
 +  private final ExecutorService defaultReadAheadThreadPool;
 +  private final Map<String,ExecutorService> threadPools = new 
TreeMap<String,ExecutorService>();
  
 -  private HashSet<TabletResourceManager> tabletResources;
++  private final ConcurrentHashMap<KeyExtent,RunnableStartedAt> 
activeAssignments;
+ 
    private final VolumeManager fs;
  
 -  private FileManager fileManager;
 +  private final FileManager fileManager;
  
 -  private MemoryManager memoryManager;
 +  private final MemoryManager memoryManager;
  
 -  private MemoryManagementFramework memMgmt;
 +  private final MemoryManagementFramework memMgmt;
  
    private final LruBlockCache _dCache;
    private final LruBlockCache _iCache;
@@@ -207,6 -215,61 +213,61 @@@
      memoryManager.init(conf);
      memMgmt = new MemoryManagementFramework();
      memMgmt.startThreads();
+ 
 -    SimpleTimer timer = SimpleTimer.getInstance();
++    SimpleTimer timer = SimpleTimer.getInstance(conf.getConfiguration());
+ 
+     // We can use the same map for both metadata and normal assignments since 
the keyspace (extent)
+     // is guaranteed to be unique. Schedule the task once, the task will 
reschedule itself.
+     timer.schedule(new AssignmentWatcher(acuConf, activeAssignments, timer), 
5000);
+   }
+ 
+   /**
+    * Accepts some map which is tracking active assignment task(s) (running) 
and monitors them to ensure that the time the assignment(s) have been running 
don't
+    * exceed a threshold. If the time is exceeded a warning is printed and a 
stack trace is logged for the running assignment.
+    */
+   protected static class AssignmentWatcher implements Runnable {
+     private static final Logger log = 
Logger.getLogger(AssignmentWatcher.class);
+ 
+     private final Map<KeyExtent,RunnableStartedAt> activeAssignments;
+     private final AccumuloConfiguration conf;
+     private final SimpleTimer timer;
+ 
+     public AssignmentWatcher(AccumuloConfiguration conf, 
Map<KeyExtent,RunnableStartedAt> activeAssignments, SimpleTimer timer) {
+       this.conf = conf;
+       this.activeAssignments = activeAssignments;
+       this.timer = timer;
+     }
+ 
+     @Override
+     public void run() {
+       final long millisBeforeWarning = 
conf.getTimeInMillis(Property.TSERV_ASSIGNMENT_DURATION_WARNING);
+       try {
+         long now = System.currentTimeMillis();
+         KeyExtent extent;
+         RunnableStartedAt runnable;
+         for (Entry<KeyExtent,RunnableStartedAt> entry : 
activeAssignments.entrySet()) {
+           extent = entry.getKey();
+           runnable = entry.getValue();
+           final long duration = now - runnable.getStartTime();
+ 
+           // Print a warning if an assignment has been running for over the 
configured time length
+           if (duration > millisBeforeWarning) {
+             log.warn("Assignment for " + extent + " has been running for at 
least " + duration + "ms", runnable.getTask().getException());
+           } else if (log.isTraceEnabled()) {
+             log.trace("Assignment for " + extent + " only running for " + 
duration + "ms");
+           }
+         }
+       } catch (Exception e) {
+         log.warn("Caught exception checking active assignments", e);
+       } finally {
+         // Don't run more often than every 5s
+         long delay = Math.max((long) (millisBeforeWarning * 0.5), 5000l);
+         if (log.isTraceEnabled()) {
+           log.trace("Rescheduling assignment watcher to run in " + delay + 
"ms");
+         }
+         timer.schedule(this, delay);
+       }
+     }
    }
  
    private static class TabletStateImpl implements TabletState, Cloneable {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/6236c4d4/server/tserver/src/test/java/org/apache/accumulo/tserver/AssignmentWatcherTest.java
----------------------------------------------------------------------
diff --cc 
server/tserver/src/test/java/org/apache/accumulo/tserver/AssignmentWatcherTest.java
index 0000000,8dfc256..6ce893c
mode 000000,100644..100644
--- 
a/server/tserver/src/test/java/org/apache/accumulo/tserver/AssignmentWatcherTest.java
+++ 
b/server/tserver/src/test/java/org/apache/accumulo/tserver/AssignmentWatcherTest.java
@@@ -1,0 -1,66 +1,65 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.accumulo.tserver;
+ 
+ import java.util.HashMap;
+ import java.util.Map;
+ 
+ import org.apache.accumulo.core.conf.AccumuloConfiguration;
+ import org.apache.accumulo.core.conf.Property;
+ import org.apache.accumulo.core.data.KeyExtent;
+ import org.apache.accumulo.server.util.time.SimpleTimer;
+ import 
org.apache.accumulo.tserver.TabletServerResourceManager.AssignmentWatcher;
+ import org.apache.hadoop.io.Text;
+ import org.easymock.EasyMock;
+ import org.junit.Before;
+ import org.junit.Test;
+ 
+ public class AssignmentWatcherTest {
+ 
+   private Map<KeyExtent,RunnableStartedAt> assignments;
+   private SimpleTimer timer;
+   private AccumuloConfiguration conf;
+   private AssignmentWatcher watcher;
+ 
+   @Before
+   public void setup() {
+     assignments = new HashMap<KeyExtent,RunnableStartedAt>();
+     timer = EasyMock.createMock(SimpleTimer.class);
+     conf = EasyMock.createMock(AccumuloConfiguration.class);
+     watcher = new AssignmentWatcher(conf, assignments, timer);
+   }
+ 
+   @Test
+   public void testAssignmentWarning() {
+     ActiveAssignmentRunnable task = 
EasyMock.createMock(ActiveAssignmentRunnable.class);
+     RunnableStartedAt run = new RunnableStartedAt(task, 
System.currentTimeMillis());
+     
EasyMock.expect(conf.getTimeInMillis(Property.TSERV_ASSIGNMENT_DURATION_WARNING)).andReturn(0l);
+ 
+     assignments.put(new KeyExtent(new Text("1"), null, null), run);
+ 
+     EasyMock.expect(task.getException()).andReturn(new Exception("Assignment 
warning happened"));
 -    timer.schedule(watcher, 5000l);
 -    EasyMock.expectLastCall();
++    EasyMock.expect(timer.schedule(watcher, 5000l)).andReturn(null);
+ 
+     EasyMock.replay(timer, conf, task);
+ 
+     watcher.run();
+ 
+     EasyMock.verify(timer, conf, task);
+   }
+ 
+ }

Reply via email to