Merge branch '1.6' Conflicts: core/src/main/java/org/apache/accumulo/core/conf/Property.java server/tserver/pom.xml server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/6236c4d4 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/6236c4d4 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/6236c4d4 Branch: refs/heads/master Commit: 6236c4d4761f11517c0c6b808b1f95072b668e20 Parents: 7e0121d 1f3288b Author: Josh Elser <els...@apache.org> Authored: Sat Nov 8 16:51:03 2014 -0500 Committer: Josh Elser <els...@apache.org> Committed: Sat Nov 8 16:51:03 2014 -0500 ---------------------------------------------------------------------- .../org/apache/accumulo/core/conf/Property.java | 2 + .../tserver/ActiveAssignmentRunnable.java | 78 ++++++++++++++++++++ .../accumulo/tserver/RunnableStartedAt.java | 51 +++++++++++++ .../apache/accumulo/tserver/TabletServer.java | 13 ++-- .../tserver/TabletServerResourceManager.java | 75 +++++++++++++++++-- .../accumulo/tserver/AssignmentWatcherTest.java | 65 ++++++++++++++++ 6 files changed, 271 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/6236c4d4/core/src/main/java/org/apache/accumulo/core/conf/Property.java ---------------------------------------------------------------------- diff --cc core/src/main/java/org/apache/accumulo/core/conf/Property.java index 11c8136,aec7af5..f59b654 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@@ -287,12 -274,12 +287,14 @@@ public enum Property "The number of threads for the distributed work queue. These threads are used for copying failed bulk files."), TSERV_WAL_SYNC("tserver.wal.sync", "true", PropertyType.BOOLEAN, "Use the SYNC_BLOCK create flag to sync WAL writes to disk. Prevents problems recovering from sudden system resets."), - TSERV_WAL_SYNC_METHOD("tserver.wal.sync.method", "hsync", PropertyType.STRING, "The method to invoke when sync'ing WALs. HSync will provide " + - "resiliency in the face of unexpected power outages, at the cost of speed. If method is not available, the legacy 'sync' method " + - "will be used to ensure backwards compatibility with older Hadoop versions. A value of 'hflush' is the alternative to the default value " + - "of 'hsync' which will result in faster writes, but with less durability"), + @Deprecated + TSERV_WAL_SYNC_METHOD("tserver.wal.sync.method", "hsync", PropertyType.STRING, "This property is deprecated. Use table.durability instead."), + TSERV_ASSIGNMENT_DURATION_WARNING("tserver.assignment.duration.warning", "10m", PropertyType.TIMEDURATION, "The amount of time an assignment can run " + + " before the server will print a warning along with the current stack trace. Meant to help debug stuck assignments"), + TSERV_REPLICATION_REPLAYERS("tserver.replication.replayer.", null, PropertyType.PREFIX, "Allows configuration of implementation used to apply replicated data"), + TSERV_REPLICATION_DEFAULT_HANDLER("tserver.replication.default.replayer", "org.apache.accumulo.tserver.replication.BatchWriterReplicationReplayer", + PropertyType.CLASSNAME, "Default AccumuloReplicationReplayer implementation"), + TSERV_REPLICATION_BW_REPLAYER_MEMORY("tserver.replication.batchwriter.replayer.memory", "50M", PropertyType.MEMORY, "Memory to provide to batchwriter to replay mutations for replication"), // properties that are specific to logger server behavior LOGGER_PREFIX("logger.", null, PropertyType.PREFIX, "Properties in this category affect the behavior of the write-ahead logger servers"), http://git-wip-us.apache.org/repos/asf/accumulo/blob/6236c4d4/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java ---------------------------------------------------------------------- diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 913feed,8ef44da..1e81947 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@@ -2019,12 -2825,12 +2020,12 @@@ public class TabletServer implements Ru } } - private class AssignmentHandler implements Runnable { + protected class AssignmentHandler implements Runnable { - private KeyExtent extent; - private int retryAttempt = 0; + private final KeyExtent extent; + private final int retryAttempt; public AssignmentHandler(KeyExtent extent) { - this.extent = extent; + this(extent, 0); } public AssignmentHandler(KeyExtent extent, int retryAttempt) { http://git-wip-us.apache.org/repos/asf/accumulo/blob/6236c4d4/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java ---------------------------------------------------------------------- diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java index 7806bbc,7c0eedc..ba86522 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java @@@ -53,7 -52,10 +54,8 @@@ import org.apache.accumulo.server.table import org.apache.accumulo.server.tabletserver.MemoryManager; import org.apache.accumulo.server.tabletserver.TabletState; import org.apache.accumulo.server.util.time.SimpleTimer; -import org.apache.accumulo.trace.instrument.TraceExecutorService; import org.apache.accumulo.tserver.FileManager.ScanFileManager; -import org.apache.accumulo.tserver.Tablet.MinorCompactionReason; + import org.apache.accumulo.tserver.TabletServer.AssignmentHandler; import org.apache.accumulo.tserver.compaction.CompactionStrategy; import org.apache.accumulo.tserver.compaction.DefaultCompactionStrategy; import org.apache.accumulo.tserver.compaction.MajorCompactionReason; @@@ -69,29 -70,31 +71,31 @@@ import org.apache.log4j.Logger */ public class TabletServerResourceManager { - private ExecutorService minorCompactionThreadPool; - private ExecutorService majorCompactionThreadPool; - private ExecutorService rootMajorCompactionThreadPool; - private ExecutorService defaultMajorCompactionThreadPool; - private ExecutorService splitThreadPool; - private ExecutorService defaultSplitThreadPool; - private ExecutorService defaultMigrationPool; - private ExecutorService migrationPool; - private ExecutorService assignmentPool; - private ExecutorService assignMetaDataPool; - private ExecutorService readAheadThreadPool; - private ExecutorService defaultReadAheadThreadPool; - private Map<String,ExecutorService> threadPools = new TreeMap<String,ExecutorService>(); + private static final Logger log = Logger.getLogger(TabletServerResourceManager.class); - private final ConcurrentHashMap<KeyExtent,RunnableStartedAt> activeAssignments; + private final ExecutorService minorCompactionThreadPool; + private final ExecutorService majorCompactionThreadPool; + private final ExecutorService rootMajorCompactionThreadPool; + private final ExecutorService defaultMajorCompactionThreadPool; + private final ExecutorService splitThreadPool; + private final ExecutorService defaultSplitThreadPool; + private final ExecutorService defaultMigrationPool; + private final ExecutorService migrationPool; + private final ExecutorService assignmentPool; + private final ExecutorService assignMetaDataPool; + private final ExecutorService readAheadThreadPool; + private final ExecutorService defaultReadAheadThreadPool; + private final Map<String,ExecutorService> threadPools = new TreeMap<String,ExecutorService>(); - private HashSet<TabletResourceManager> tabletResources; ++ private final ConcurrentHashMap<KeyExtent,RunnableStartedAt> activeAssignments; + private final VolumeManager fs; - private FileManager fileManager; + private final FileManager fileManager; - private MemoryManager memoryManager; + private final MemoryManager memoryManager; - private MemoryManagementFramework memMgmt; + private final MemoryManagementFramework memMgmt; private final LruBlockCache _dCache; private final LruBlockCache _iCache; @@@ -207,6 -215,61 +213,61 @@@ memoryManager.init(conf); memMgmt = new MemoryManagementFramework(); memMgmt.startThreads(); + - SimpleTimer timer = SimpleTimer.getInstance(); ++ SimpleTimer timer = SimpleTimer.getInstance(conf.getConfiguration()); + + // We can use the same map for both metadata and normal assignments since the keyspace (extent) + // is guaranteed to be unique. Schedule the task once, the task will reschedule itself. + timer.schedule(new AssignmentWatcher(acuConf, activeAssignments, timer), 5000); + } + + /** + * Accepts some map which is tracking active assignment task(s) (running) and monitors them to ensure that the time the assignment(s) have been running don't + * exceed a threshold. If the time is exceeded a warning is printed and a stack trace is logged for the running assignment. + */ + protected static class AssignmentWatcher implements Runnable { + private static final Logger log = Logger.getLogger(AssignmentWatcher.class); + + private final Map<KeyExtent,RunnableStartedAt> activeAssignments; + private final AccumuloConfiguration conf; + private final SimpleTimer timer; + + public AssignmentWatcher(AccumuloConfiguration conf, Map<KeyExtent,RunnableStartedAt> activeAssignments, SimpleTimer timer) { + this.conf = conf; + this.activeAssignments = activeAssignments; + this.timer = timer; + } + + @Override + public void run() { + final long millisBeforeWarning = conf.getTimeInMillis(Property.TSERV_ASSIGNMENT_DURATION_WARNING); + try { + long now = System.currentTimeMillis(); + KeyExtent extent; + RunnableStartedAt runnable; + for (Entry<KeyExtent,RunnableStartedAt> entry : activeAssignments.entrySet()) { + extent = entry.getKey(); + runnable = entry.getValue(); + final long duration = now - runnable.getStartTime(); + + // Print a warning if an assignment has been running for over the configured time length + if (duration > millisBeforeWarning) { + log.warn("Assignment for " + extent + " has been running for at least " + duration + "ms", runnable.getTask().getException()); + } else if (log.isTraceEnabled()) { + log.trace("Assignment for " + extent + " only running for " + duration + "ms"); + } + } + } catch (Exception e) { + log.warn("Caught exception checking active assignments", e); + } finally { + // Don't run more often than every 5s + long delay = Math.max((long) (millisBeforeWarning * 0.5), 5000l); + if (log.isTraceEnabled()) { + log.trace("Rescheduling assignment watcher to run in " + delay + "ms"); + } + timer.schedule(this, delay); + } + } } private static class TabletStateImpl implements TabletState, Cloneable { http://git-wip-us.apache.org/repos/asf/accumulo/blob/6236c4d4/server/tserver/src/test/java/org/apache/accumulo/tserver/AssignmentWatcherTest.java ---------------------------------------------------------------------- diff --cc server/tserver/src/test/java/org/apache/accumulo/tserver/AssignmentWatcherTest.java index 0000000,8dfc256..6ce893c mode 000000,100644..100644 --- a/server/tserver/src/test/java/org/apache/accumulo/tserver/AssignmentWatcherTest.java +++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/AssignmentWatcherTest.java @@@ -1,0 -1,66 +1,65 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.accumulo.tserver; + + import java.util.HashMap; + import java.util.Map; + + import org.apache.accumulo.core.conf.AccumuloConfiguration; + import org.apache.accumulo.core.conf.Property; + import org.apache.accumulo.core.data.KeyExtent; + import org.apache.accumulo.server.util.time.SimpleTimer; + import org.apache.accumulo.tserver.TabletServerResourceManager.AssignmentWatcher; + import org.apache.hadoop.io.Text; + import org.easymock.EasyMock; + import org.junit.Before; + import org.junit.Test; + + public class AssignmentWatcherTest { + + private Map<KeyExtent,RunnableStartedAt> assignments; + private SimpleTimer timer; + private AccumuloConfiguration conf; + private AssignmentWatcher watcher; + + @Before + public void setup() { + assignments = new HashMap<KeyExtent,RunnableStartedAt>(); + timer = EasyMock.createMock(SimpleTimer.class); + conf = EasyMock.createMock(AccumuloConfiguration.class); + watcher = new AssignmentWatcher(conf, assignments, timer); + } + + @Test + public void testAssignmentWarning() { + ActiveAssignmentRunnable task = EasyMock.createMock(ActiveAssignmentRunnable.class); + RunnableStartedAt run = new RunnableStartedAt(task, System.currentTimeMillis()); + EasyMock.expect(conf.getTimeInMillis(Property.TSERV_ASSIGNMENT_DURATION_WARNING)).andReturn(0l); + + assignments.put(new KeyExtent(new Text("1"), null, null), run); + + EasyMock.expect(task.getException()).andReturn(new Exception("Assignment warning happened")); - timer.schedule(watcher, 5000l); - EasyMock.expectLastCall(); ++ EasyMock.expect(timer.schedule(watcher, 5000l)).andReturn(null); + + EasyMock.replay(timer, conf, task); + + watcher.run(); + + EasyMock.verify(timer, conf, task); + } + + }