Merge branch '1.4.5-SNAPSHOT' into 1.5.2-SNAPSHOT Conflicts: server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/bf0b7f78 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/bf0b7f78 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/bf0b7f78 Branch: refs/heads/1.6.0-SNAPSHOT Commit: bf0b7f78b52f5d61e84792888ce479c758e4028d Parents: 43cebf8 d11acbe Author: Bill Havanki <bhava...@cloudera.com> Authored: Wed Mar 12 16:49:16 2014 -0400 Committer: Bill Havanki <bhava...@cloudera.com> Committed: Wed Mar 12 16:49:16 2014 -0400 ---------------------------------------------------------------------- .../TabletServerResourceManager.java | 21 ++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/bf0b7f78/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java ---------------------------------------------------------------------- diff --cc server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java index e0dbead,0000000..57cd49b mode 100644,000000..100644 --- a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java +++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java @@@ -1,803 -1,0 +1,808 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.tabletserver; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.accumulo.trace.instrument.TraceExecutorService; +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.KeyExtent; +import org.apache.accumulo.core.file.blockfile.cache.LruBlockCache; +import org.apache.accumulo.core.util.Daemon; +import org.apache.accumulo.core.util.LoggingRunnable; +import org.apache.accumulo.core.util.MetadataTable.DataFileValue; +import org.apache.accumulo.core.util.NamingThreadFactory; +import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.accumulo.server.conf.ServerConfiguration; +import org.apache.accumulo.server.tabletserver.FileManager.ScanFileManager; +import org.apache.accumulo.server.tabletserver.Tablet.MajorCompactionReason; +import org.apache.accumulo.server.tabletserver.Tablet.MinorCompactionReason; +import org.apache.accumulo.server.util.time.SimpleTimer; +import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader; +import org.apache.hadoop.fs.FileSystem; +import org.apache.log4j.Logger; + +/** + * ResourceManager is responsible for managing the resources of all tablets within a tablet server. + * + * + * + */ +public class TabletServerResourceManager { + + private ExecutorService minorCompactionThreadPool; + private ExecutorService majorCompactionThreadPool; + private ExecutorService rootMajorCompactionThreadPool; + private ExecutorService defaultMajorCompactionThreadPool; + private ExecutorService splitThreadPool; + private ExecutorService defaultSplitThreadPool; + private ExecutorService defaultMigrationPool; + private ExecutorService migrationPool; + private ExecutorService assignmentPool; + private ExecutorService assignMetaDataPool; + private ExecutorService readAheadThreadPool; + private ExecutorService defaultReadAheadThreadPool; + private Map<String,ExecutorService> threadPools = new TreeMap<String,ExecutorService>(); + + private HashSet<TabletResourceManager> tabletResources; + + private FileManager fileManager; + + private MemoryManager memoryManager; + + private MemoryManagementFramework memMgmt; + + private final LruBlockCache _dCache; + private final LruBlockCache _iCache; + private final ServerConfiguration conf; + + private static final Logger log = Logger.getLogger(TabletServerResourceManager.class); + + private ExecutorService addEs(String name, ExecutorService tp) { + if (threadPools.containsKey(name)) { + throw new IllegalArgumentException("Cannot create two executor services with same name " + name); + } + tp = new TraceExecutorService(tp); + threadPools.put(name, tp); + return tp; + } + + private ExecutorService addEs(final Property maxThreads, String name, final ThreadPoolExecutor tp) { + ExecutorService result = addEs(name, tp); + SimpleTimer.getInstance().schedule(new Runnable() { + @Override + public void run() { + try { + int max = conf.getConfiguration().getCount(maxThreads); + if (tp.getMaximumPoolSize() != max) { + log.info("Changing " + maxThreads.getKey() + " to " + max); + tp.setCorePoolSize(max); + tp.setMaximumPoolSize(max); + } + } catch (Throwable t) { + log.error(t, t); + } + } + + }, 1000, 10 * 1000); + return result; + } + + private ExecutorService createEs(int max, String name) { + return addEs(name, Executors.newFixedThreadPool(max, new NamingThreadFactory(name))); + } + + private ExecutorService createEs(Property max, String name) { + return createEs(max, name, new LinkedBlockingQueue<Runnable>()); + } + + private ExecutorService createEs(Property max, String name, BlockingQueue<Runnable> queue) { + int maxThreads = conf.getConfiguration().getCount(max); + ThreadPoolExecutor tp = new ThreadPoolExecutor(maxThreads, maxThreads, 0L, TimeUnit.MILLISECONDS, queue, new NamingThreadFactory(name)); + return addEs(max, name, tp); + } + + private ExecutorService createEs(int min, int max, int timeout, String name) { + return addEs(name, new ThreadPoolExecutor(min, max, timeout, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new NamingThreadFactory(name))); + } + + public TabletServerResourceManager(Instance instance, FileSystem fs) { + this.conf = new ServerConfiguration(instance); + final AccumuloConfiguration acuConf = conf.getConfiguration(); + + long maxMemory = acuConf.getMemoryInBytes(Property.TSERV_MAXMEM); + boolean usingNativeMap = acuConf.getBoolean(Property.TSERV_NATIVEMAP_ENABLED) && NativeMap.loadedNativeLibraries(); + + long blockSize = acuConf.getMemoryInBytes(Property.TSERV_DEFAULT_BLOCKSIZE); + long dCacheSize = acuConf.getMemoryInBytes(Property.TSERV_DATACACHE_SIZE); + long iCacheSize = acuConf.getMemoryInBytes(Property.TSERV_INDEXCACHE_SIZE); + + _iCache = new LruBlockCache(iCacheSize, blockSize); + _dCache = new LruBlockCache(dCacheSize, blockSize); + + Runtime runtime = Runtime.getRuntime(); + if (!usingNativeMap && maxMemory + dCacheSize + iCacheSize > runtime.maxMemory()) { + throw new IllegalArgumentException(String.format( + "Maximum tablet server map memory %,d and block cache sizes %,d is too large for this JVM configuration %,d", maxMemory, dCacheSize + iCacheSize, + runtime.maxMemory())); + } + runtime.gc(); + + // totalMemory - freeMemory = memory in use + // maxMemory - memory in use = max available memory + if (!usingNativeMap && maxMemory > runtime.maxMemory() - (runtime.totalMemory() - runtime.freeMemory())) { + log.warn("In-memory map may not fit into local memory space."); + } + + minorCompactionThreadPool = createEs(Property.TSERV_MINC_MAXCONCURRENT, "minor compactor"); + + // make this thread pool have a priority queue... and execute tablets with the most + // files first! + majorCompactionThreadPool = createEs(Property.TSERV_MAJC_MAXCONCURRENT, "major compactor", new CompactionQueue()); + rootMajorCompactionThreadPool = createEs(0, 1, 300, "md root major compactor"); + defaultMajorCompactionThreadPool = createEs(0, 1, 300, "md major compactor"); + + splitThreadPool = createEs(1, "splitter"); + defaultSplitThreadPool = createEs(0, 1, 60, "md splitter"); + + defaultMigrationPool = createEs(0, 1, 60, "metadata tablet migration"); + migrationPool = createEs(Property.TSERV_MIGRATE_MAXCONCURRENT, "tablet migration"); + + // not sure if concurrent assignments can run safely... even if they could there is probably no benefit at startup because + // individual tablet servers are already running assignments concurrently... having each individual tablet server run + // concurrent assignments would put more load on the metadata table at startup + assignmentPool = createEs(1, "tablet assignment"); + + assignMetaDataPool = createEs(0, 1, 60, "metadata tablet assignment"); + + readAheadThreadPool = createEs(Property.TSERV_READ_AHEAD_MAXCONCURRENT, "tablet read ahead"); + defaultReadAheadThreadPool = createEs(Property.TSERV_METADATA_READ_AHEAD_MAXCONCURRENT, "metadata tablets read ahead"); + + tabletResources = new HashSet<TabletResourceManager>(); + + int maxOpenFiles = acuConf.getCount(Property.TSERV_SCAN_MAX_OPENFILES); + + fileManager = new FileManager(conf, fs, maxOpenFiles, _dCache, _iCache); + + try { + Class<? extends MemoryManager> clazz = AccumuloVFSClassLoader.loadClass(acuConf.get(Property.TSERV_MEM_MGMT), MemoryManager.class); + memoryManager = clazz.newInstance(); + memoryManager.init(conf); + log.debug("Loaded memory manager : " + memoryManager.getClass().getName()); + } catch (Exception e) { + log.error("Failed to find memory manger in config, using default", e); + } + + if (memoryManager == null) { + memoryManager = new LargestFirstMemoryManager(); + } + + memMgmt = new MemoryManagementFramework(); ++ memMgmt.startThreads(); + } + + private static class TabletStateImpl implements TabletState, Cloneable { + + private long lct; + private Tablet tablet; + private long mts; + private long mcmts; + + public TabletStateImpl(Tablet t, long mts, long lct, long mcmts) { + this.tablet = t; + this.mts = mts; + this.lct = lct; + this.mcmts = mcmts; + } + + public KeyExtent getExtent() { + return tablet.getExtent(); + } + + Tablet getTablet() { + return tablet; + } + + public long getLastCommitTime() { + return lct; + } + + public long getMemTableSize() { + return mts; + } + + public long getMinorCompactingMemTableSize() { + return mcmts; + } + } + + private class MemoryManagementFramework { + private final Map<KeyExtent,TabletStateImpl> tabletReports; + private LinkedBlockingQueue<TabletStateImpl> memUsageReports; + private long lastMemCheckTime = System.currentTimeMillis(); + private long maxMem; ++ private Thread memoryGuardThread; ++ private Thread minorCompactionInitiatorThread; + + MemoryManagementFramework() { + tabletReports = Collections.synchronizedMap(new HashMap<KeyExtent,TabletStateImpl>()); + memUsageReports = new LinkedBlockingQueue<TabletStateImpl>(); + maxMem = conf.getConfiguration().getMemoryInBytes(Property.TSERV_MAXMEM); + + Runnable r1 = new Runnable() { + public void run() { + processTabletMemStats(); + } + }; + - Thread t1 = new Daemon(new LoggingRunnable(log, r1)); - t1.setPriority(Thread.NORM_PRIORITY + 1); - t1.setName("Accumulo Memory Guard"); - t1.start(); ++ memoryGuardThread = new Daemon(new LoggingRunnable(log, r1)); ++ memoryGuardThread.setPriority(Thread.NORM_PRIORITY + 1); ++ memoryGuardThread.setName("Accumulo Memory Guard"); + + Runnable r2 = new Runnable() { + public void run() { + manageMemory(); + } + }; + - Thread t2 = new Daemon(new LoggingRunnable(log, r2)); - t2.setName("Accumulo Minor Compaction Initiator"); - t2.start(); - ++ minorCompactionInitiatorThread = new Daemon(new LoggingRunnable(log, r2)); ++ minorCompactionInitiatorThread.setName("Accumulo Minor Compaction Initiator"); ++ } ++ ++ void startThreads() { ++ memoryGuardThread.start(); ++ minorCompactionInitiatorThread.start(); + } + + private long lastMemTotal = 0; + + private void processTabletMemStats() { + while (true) { + try { + + TabletStateImpl report = memUsageReports.take(); + + while (report != null) { + tabletReports.put(report.getExtent(), report); + report = memUsageReports.poll(); + } + + long delta = System.currentTimeMillis() - lastMemCheckTime; + if (holdCommits || delta > 50 || lastMemTotal > 0.90 * maxMem) { + lastMemCheckTime = System.currentTimeMillis(); + + long totalMemUsed = 0; + + synchronized (tabletReports) { + for (TabletStateImpl tsi : tabletReports.values()) { + totalMemUsed += tsi.getMemTableSize(); + totalMemUsed += tsi.getMinorCompactingMemTableSize(); + } + } + + if (totalMemUsed > 0.95 * maxMem) { + holdAllCommits(true); + } else { + holdAllCommits(false); + } + + lastMemTotal = totalMemUsed; + } + + } catch (InterruptedException e) { + log.warn(e, e); + } + } + } + + private void manageMemory() { + while (true) { + MemoryManagementActions mma = null; + + try { + ArrayList<TabletState> tablets; + synchronized (tabletReports) { + tablets = new ArrayList<TabletState>(tabletReports.values()); + } + mma = memoryManager.getMemoryManagementActions(tablets); + + } catch (Throwable t) { + log.error("Memory manager failed " + t.getMessage(), t); + } + + try { + if (mma != null && mma.tabletsToMinorCompact != null && mma.tabletsToMinorCompact.size() > 0) { + for (KeyExtent keyExtent : mma.tabletsToMinorCompact) { + TabletStateImpl tabletReport = tabletReports.get(keyExtent); + + if (tabletReport == null) { + log.warn("Memory manager asked to compact nonexistant tablet " + keyExtent); + continue; + } + + if (!tabletReport.getTablet().initiateMinorCompaction(MinorCompactionReason.SYSTEM)) { + if (tabletReport.getTablet().isClosed()) { + tabletReports.remove(tabletReport.getExtent()); + log.debug("Ignoring memory manager recommendation: not minor compacting closed tablet " + keyExtent); + } else { + log.info("Ignoring memory manager recommendation: not minor compacting " + keyExtent); + } + } + } + + // log.debug("mma.tabletsToMinorCompact = "+mma.tabletsToMinorCompact); + } + } catch (Throwable t) { + log.error("Minor compactions for memory managment failed", t); + } + + UtilWaitThread.sleep(250); + } + } + + public void updateMemoryUsageStats(Tablet tablet, long size, long lastCommitTime, long mincSize) { + memUsageReports.add(new TabletStateImpl(tablet, size, lastCommitTime, mincSize)); + } + + public void tabletClosed(KeyExtent extent) { + tabletReports.remove(extent); + } + } + + private final Object commitHold = new Object(); + private volatile boolean holdCommits = false; + private long holdStartTime; + + protected void holdAllCommits(boolean holdAllCommits) { + synchronized (commitHold) { + if (holdCommits != holdAllCommits) { + holdCommits = holdAllCommits; + + if (holdCommits) { + holdStartTime = System.currentTimeMillis(); + } + + if (!holdCommits) { + log.debug(String.format("Commits held for %6.2f secs", (System.currentTimeMillis() - holdStartTime) / 1000.0)); + commitHold.notifyAll(); + } + } + } + + } + + void waitUntilCommitsAreEnabled() { + if (holdCommits) { + long timeout = System.currentTimeMillis() + conf.getConfiguration().getTimeInMillis(Property.GENERAL_RPC_TIMEOUT); + synchronized (commitHold) { + while (holdCommits) { + try { + if (System.currentTimeMillis() > timeout) + throw new HoldTimeoutException("Commits are held"); + commitHold.wait(1000); + } catch (InterruptedException e) {} + } + } + } + } + + public long holdTime() { + if (!holdCommits) + return 0; + synchronized (commitHold) { + return System.currentTimeMillis() - holdStartTime; + } + } + + public void close() { + for (ExecutorService executorService : threadPools.values()) { + executorService.shutdown(); + } + + for (Entry<String,ExecutorService> entry : threadPools.entrySet()) { + while (true) { + try { + if (entry.getValue().awaitTermination(60, TimeUnit.SECONDS)) + break; + log.info("Waiting for thread pool " + entry.getKey() + " to shutdown"); + } catch (InterruptedException e) { + log.warn(e); + } + } + } + } + + public synchronized TabletResourceManager createTabletResourceManager() { + TabletResourceManager trm = new TabletResourceManager(); + return trm; + } + + synchronized private void addTabletResource(TabletResourceManager tr) { + tabletResources.add(tr); + } + + synchronized private void removeTabletResource(TabletResourceManager tr) { + tabletResources.remove(tr); + } + + private class MapFileInfo { + private final String path; + private final long size; + + MapFileInfo(String path, long size) { + this.path = path; + this.size = size; + } + } + + public class TabletResourceManager { + + private final long creationTime = System.currentTimeMillis(); + + private volatile boolean openFilesReserved = false; + + private volatile boolean closed = false; + + private Tablet tablet; + + private AccumuloConfiguration tableConf; + + TabletResourceManager() {} + + void setTablet(Tablet tablet, AccumuloConfiguration tableConf) { + this.tablet = tablet; + this.tableConf = tableConf; + // TabletResourceManager is not really initialized until this + // function is called.... so do not make it publicly available + // until now + + addTabletResource(this); + } + + // BEGIN methods that Tablets call to manage their set of open map files + + public void importedMapFiles() { + lastReportedCommitTime = System.currentTimeMillis(); + } + + synchronized ScanFileManager newScanFileManager() { + if (closed) + throw new IllegalStateException("closed"); + return fileManager.newScanFileManager(tablet.getExtent()); + } + + // END methods that Tablets call to manage their set of open map files + + // BEGIN methods that Tablets call to manage memory + + private AtomicLong lastReportedSize = new AtomicLong(); + private AtomicLong lastReportedMincSize = new AtomicLong(); + private volatile long lastReportedCommitTime = 0; + + public void updateMemoryUsageStats(long size, long mincSize) { + + // do not want to update stats for every little change, + // so only do it under certain circumstances... the reason + // for this is that reporting stats acquires a lock, do + // not want all tablets locking on the same lock for every + // commit + long totalSize = size + mincSize; + long lrs = lastReportedSize.get(); + long delta = totalSize - lrs; + long lrms = lastReportedMincSize.get(); + boolean report = false; + // the atomic longs are considered independently, when one is set + // the other is not set intentionally because this method is not + // synchronized... therefore there are not transactional semantics + // for reading and writing two variables + if ((lrms > 0 && mincSize == 0 || lrms == 0 && mincSize > 0) && lastReportedMincSize.compareAndSet(lrms, mincSize)) { + report = true; + } + + long currentTime = System.currentTimeMillis(); + if ((delta > 32000 || delta < 0 || (currentTime - lastReportedCommitTime > 1000)) && lastReportedSize.compareAndSet(lrs, totalSize)) { + if (delta > 0) + lastReportedCommitTime = currentTime; + report = true; + } + + if (report) + memMgmt.updateMemoryUsageStats(tablet, size, lastReportedCommitTime, mincSize); + } + + // END methods that Tablets call to manage memory + + // BEGIN methods that Tablets call to make decisions about major compaction + // when too many files are open, we may want tablets to compact down + // to one map file + Map<String,Long> findMapFilesToCompact(SortedMap<String,DataFileValue> tabletFiles, MajorCompactionReason reason) { + if (reason == MajorCompactionReason.USER) { + Map<String,Long> files = new HashMap<String,Long>(); + for (Entry<String,DataFileValue> entry : tabletFiles.entrySet()) { + files.put(entry.getKey(), entry.getValue().getSize()); + } + return files; + } + + if (tabletFiles.size() <= 1) + return null; + TreeSet<MapFileInfo> candidateFiles = new TreeSet<MapFileInfo>(new Comparator<MapFileInfo>() { + @Override + public int compare(MapFileInfo o1, MapFileInfo o2) { + if (o1 == o2) + return 0; + if (o1.size < o2.size) + return -1; + if (o1.size > o2.size) + return 1; + return o1.path.compareTo(o2.path); + } + }); + + double ratio = tableConf.getFraction(Property.TABLE_MAJC_RATIO); + int maxFilesToCompact = tableConf.getCount(Property.TSERV_MAJC_THREAD_MAXOPEN); + int maxFilesPerTablet = tableConf.getMaxFilesPerTablet(); + + for (Entry<String,DataFileValue> entry : tabletFiles.entrySet()) { + candidateFiles.add(new MapFileInfo(entry.getKey(), entry.getValue().getSize())); + } + + long totalSize = 0; + for (MapFileInfo mfi : candidateFiles) { + totalSize += mfi.size; + } + + Map<String,Long> files = new HashMap<String,Long>(); + + while (candidateFiles.size() > 1) { + MapFileInfo max = candidateFiles.last(); + if (max.size * ratio <= totalSize) { + files.clear(); + for (MapFileInfo mfi : candidateFiles) { + files.put(mfi.path, mfi.size); + if (files.size() >= maxFilesToCompact) + break; + } + + break; + } + totalSize -= max.size; + candidateFiles.remove(max); + } + + int totalFilesToCompact = 0; + if (tabletFiles.size() > maxFilesPerTablet) + totalFilesToCompact = tabletFiles.size() - maxFilesPerTablet + 1; + + totalFilesToCompact = Math.min(totalFilesToCompact, maxFilesToCompact); + + if (files.size() < totalFilesToCompact) { + + TreeMap<String,DataFileValue> tfc = new TreeMap<String,DataFileValue>(tabletFiles); + tfc.keySet().removeAll(files.keySet()); + + // put data in candidateFiles to sort it + candidateFiles.clear(); + for (Entry<String,DataFileValue> entry : tfc.entrySet()) + candidateFiles.add(new MapFileInfo(entry.getKey(), entry.getValue().getSize())); + + for (MapFileInfo mfi : candidateFiles) { + files.put(mfi.path, mfi.size); + if (files.size() >= totalFilesToCompact) + break; + } + } + + if (files.size() == 0) + return null; + + return files; + } + + boolean needsMajorCompaction(SortedMap<String,DataFileValue> tabletFiles, MajorCompactionReason reason) { + if (closed) + return false;// throw new IOException("closed"); + + // int threshold; + + if (reason == MajorCompactionReason.USER) + return true; + + if (reason == MajorCompactionReason.IDLE) { + // threshold = 1; + long idleTime; + if (lastReportedCommitTime == 0) { + // no commits, so compute how long the tablet has been assigned to the + // tablet server + idleTime = System.currentTimeMillis() - creationTime; + } else { + idleTime = System.currentTimeMillis() - lastReportedCommitTime; + } + + if (idleTime < tableConf.getTimeInMillis(Property.TABLE_MAJC_COMPACTALL_IDLETIME)) { + return false; + } + }/* + * else{ threshold = tableConf.getCount(Property.TABLE_MAJC_THRESHOLD); } + */ + + return findMapFilesToCompact(tabletFiles, reason) != null; + } + + // END methods that Tablets call to make decisions about major compaction + + // tablets call this method to run minor compactions, + // this allows us to control how many minor compactions + // run concurrently in a tablet server + void executeMinorCompaction(final Runnable r) { + minorCompactionThreadPool.execute(new LoggingRunnable(log, r)); + } + + void close() throws IOException { + // always obtain locks in same order to avoid deadlock + synchronized (TabletServerResourceManager.this) { + synchronized (this) { + if (closed) + throw new IOException("closed"); + if (openFilesReserved) + throw new IOException("tired to close files while open files reserved"); + + TabletServerResourceManager.this.removeTabletResource(this); + + memMgmt.tabletClosed(tablet.getExtent()); + memoryManager.tabletClosed(tablet.getExtent()); + + closed = true; + } + } + } + + public TabletServerResourceManager getTabletServerResourceManager() { + return TabletServerResourceManager.this; + } + + public void executeMajorCompaction(KeyExtent tablet, Runnable compactionTask) { + TabletServerResourceManager.this.executeMajorCompaction(tablet, compactionTask); + } + + } + + public void executeSplit(KeyExtent tablet, Runnable splitTask) { + if (tablet.isMeta()) { + if (tablet.isRootTablet()) { + log.warn("Saw request to split root tablet, ignoring"); + return; + } + defaultSplitThreadPool.execute(splitTask); + } else { + splitThreadPool.execute(splitTask); + } + } + + public void executeMajorCompaction(KeyExtent tablet, Runnable compactionTask) { + if (tablet.equals(Constants.ROOT_TABLET_EXTENT)) { + rootMajorCompactionThreadPool.execute(compactionTask); + } else if (tablet.isMeta()) { + defaultMajorCompactionThreadPool.execute(compactionTask); + } else { + majorCompactionThreadPool.execute(compactionTask); + } + } + + public void executeReadAhead(KeyExtent tablet, Runnable task) { + if (tablet.isRootTablet()) { + task.run(); + } else if (tablet.isMeta()) { + defaultReadAheadThreadPool.execute(task); + } else { + readAheadThreadPool.execute(task); + } + } + + public void addAssignment(Runnable assignmentHandler) { + assignmentPool.execute(assignmentHandler); + } + + public void addMetaDataAssignment(Runnable assignmentHandler) { + assignMetaDataPool.execute(assignmentHandler); + } + + public void addMigration(KeyExtent tablet, Runnable migrationHandler) { + if (tablet.isRootTablet()) { + migrationHandler.run(); + } else if (tablet.isMeta()) { + defaultMigrationPool.execute(migrationHandler); + } else { + migrationPool.execute(migrationHandler); + } + } + + public void stopSplits() { + splitThreadPool.shutdown(); + defaultSplitThreadPool.shutdown(); + while (true) { + try { + while (!splitThreadPool.awaitTermination(1, TimeUnit.MINUTES)) { + log.info("Waiting for metadata split thread pool to stop"); + } + while (!defaultSplitThreadPool.awaitTermination(1, TimeUnit.MINUTES)) { + log.info("Waiting for split thread pool to stop"); + } + break; + } catch (InterruptedException ex) { + log.info(ex, ex); + } + } + } + + public void stopNormalAssignments() { + assignmentPool.shutdown(); + while (true) { + try { + while (!assignmentPool.awaitTermination(1, TimeUnit.MINUTES)) { + log.info("Waiting for assignment thread pool to stop"); + } + break; + } catch (InterruptedException ex) { + log.info(ex, ex); + } + } + } + + public void stopMetadataAssignments() { + assignMetaDataPool.shutdown(); + while (true) { + try { + while (!assignMetaDataPool.awaitTermination(1, TimeUnit.MINUTES)) { + log.info("Waiting for metadata assignment thread pool to stop"); + } + break; + } catch (InterruptedException ex) { + log.info(ex, ex); + } + } + } + + public LruBlockCache getIndexCache() { + return _iCache; + } + + public LruBlockCache getDataCache() { + return _dCache; + } + +}