Merge branch '1.5.2-SNAPSHOT' into 1.6.1-SNAPSHOT
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/e856ea66 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/e856ea66 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/e856ea66 Branch: refs/heads/master Commit: e856ea6600aee9464a42f3122e7d8f2f7d23bbb1 Parents: 4d70739 fc1a4ff Author: Eric C. Newton <eric.new...@gmail.com> Authored: Thu Aug 7 14:13:56 2014 -0400 Committer: Eric C. Newton <eric.new...@gmail.com> Committed: Thu Aug 7 14:13:56 2014 -0400 ---------------------------------------------------------------------- .../tabletserver/LargestFirstMemoryManager.java | 73 ++++++++++++++------ 1 file changed, 50 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/e856ea66/server/base/src/main/java/org/apache/accumulo/server/tabletserver/LargestFirstMemoryManager.java ---------------------------------------------------------------------- diff --cc server/base/src/main/java/org/apache/accumulo/server/tabletserver/LargestFirstMemoryManager.java index b891ad6,0000000..4554afd mode 100644,000000..100644 --- a/server/base/src/main/java/org/apache/accumulo/server/tabletserver/LargestFirstMemoryManager.java +++ b/server/base/src/main/java/org/apache/accumulo/server/tabletserver/LargestFirstMemoryManager.java @@@ -1,234 -1,0 +1,261 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.tabletserver; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map.Entry; +import java.util.TreeMap; + +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.KeyExtent; +import org.apache.accumulo.server.conf.ServerConfiguration; +import org.apache.hadoop.io.Text; +import org.apache.log4j.Logger; + +/** + * The LargestFirstMemoryManager attempts to keep memory between 80% and 90% full. It adapts over time the point at which it should start a compaction based on + * how full memory gets between successive calls. It will also flush idle tablets based on a per-table configurable idle time. It will only attempt to flush + * tablets up to 20% of all memory. And, as the name of the class would suggest, it flushes the tablet with the highest memory footprint. However, it actually + * chooses the tablet as a function of its size doubled for every 15 minutes of idle time. + */ +public class LargestFirstMemoryManager implements MemoryManager { + + private static final Logger log = Logger.getLogger(LargestFirstMemoryManager.class); + private static final long ZERO_TIME = System.currentTimeMillis(); + private static final int TSERV_MINC_MAXCONCURRENT_NUMWAITING_MULTIPLIER = 2; + private static final double MAX_FLUSH_AT_ONCE_PERCENT = 0.20; + + private long maxMemory = -1; + private int maxConcurrentMincs; + private int numWaitingMultiplier; + private long prevIngestMemory; + // The fraction of memory that needs to be used before we begin flushing. + private double compactionThreshold; + private long maxObserved; + private final HashMap<Text,Long> mincIdleThresholds = new HashMap<Text,Long>(); + private ServerConfiguration config = null; + + private static class TabletInfo { + final KeyExtent extent; + final long memTableSize; + final long idleTime; + final long load; + + public TabletInfo(KeyExtent extent, long memTableSize, long idleTime, long load) { + this.extent = extent; + this.memTableSize = memTableSize; + this.idleTime = idleTime; + this.load = load; + } + } + + // A little map that will hold the "largest" N tablets, where largest is a result of the timeMemoryLoad function - @SuppressWarnings("serial") - private static class LargestMap extends TreeMap<Long,TabletInfo> { ++ private static class LargestMap { + final int max; ++ final TreeMap<Long, List<TabletInfo>> map = new TreeMap<Long, List<TabletInfo>>(); + + LargestMap(int n) { + max = n; + } + - @Override - public TabletInfo put(Long key, TabletInfo value) { - if (size() == max) { - if (key.compareTo(this.firstKey()) < 0) - return value; ++ public boolean put(Long key, TabletInfo value) { ++ if (map.size() == max) { ++ if (key.compareTo(map.firstKey()) < 0) ++ return false; + try { - return super.put(key, value); ++ add(key, value); ++ return true; + } finally { - super.remove(this.firstKey()); ++ map.remove(map.firstKey()); + } + } else { - return super.put(key, value); ++ add(key, value); ++ return true; + } + } ++ ++ private void add(Long key, TabletInfo value) { ++ List<TabletInfo> lst = map.get(key); ++ if (lst != null) { ++ lst.add(value); ++ } else { ++ lst = new ArrayList<TabletInfo>(); ++ lst.add(value); ++ map.put(key, lst); ++ } ++ } ++ ++ public boolean isEmpty() { ++ return map.isEmpty(); ++ } ++ ++ public Entry<Long,List<TabletInfo>> lastEntry() { ++ return map.lastEntry(); ++ } ++ ++ public void remove(Long key) { ++ map.remove(key); ++ } + } + + LargestFirstMemoryManager(long maxMemory, int maxConcurrentMincs, int numWaitingMultiplier) { + this(); + this.maxMemory = maxMemory; + this.maxConcurrentMincs = maxConcurrentMincs; + this.numWaitingMultiplier = numWaitingMultiplier; + } + + @Override + public void init(ServerConfiguration conf) { + this.config = conf; + maxMemory = conf.getConfiguration().getMemoryInBytes(Property.TSERV_MAXMEM); + maxConcurrentMincs = conf.getConfiguration().getCount(Property.TSERV_MINC_MAXCONCURRENT); + numWaitingMultiplier = TSERV_MINC_MAXCONCURRENT_NUMWAITING_MULTIPLIER; + } + + public LargestFirstMemoryManager() { + prevIngestMemory = 0; + compactionThreshold = 0.5; + maxObserved = 0; + } + + private long getMinCIdleThreshold(KeyExtent extent) { + Text tableId = extent.getTableId(); + if (!mincIdleThresholds.containsKey(tableId)) + mincIdleThresholds.put(tableId, config.getTableConfiguration(tableId.toString()).getTimeInMillis(Property.TABLE_MINC_COMPACT_IDLETIME)); + return mincIdleThresholds.get(tableId); + } + + @Override + public MemoryManagementActions getMemoryManagementActions(List<TabletState> tablets) { + if (maxMemory < 0) + throw new IllegalStateException("need to initialize " + LargestFirstMemoryManager.class.getName()); + + final int maxMinCs = maxConcurrentMincs * numWaitingMultiplier; + + mincIdleThresholds.clear(); + final MemoryManagementActions result = new MemoryManagementActions(); + result.tabletsToMinorCompact = new ArrayList<KeyExtent>(); + - TreeMap<Long,TabletInfo> largestMemTablets = new LargestMap(maxMinCs); - final TreeMap<Long,TabletInfo> largestIdleMemTablets = new LargestMap(maxConcurrentMincs); ++ LargestMap largestMemTablets = new LargestMap(maxMinCs); ++ final LargestMap largestIdleMemTablets = new LargestMap(maxConcurrentMincs); + final long now = System.currentTimeMillis(); + + long ingestMemory = 0; + long compactionMemory = 0; + int numWaitingMincs = 0; + + // find the largest and most idle tablets + for (TabletState ts : tablets) { + final long memTabletSize = ts.getMemTableSize(); + final long minorCompactingSize = ts.getMinorCompactingMemTableSize(); + final long idleTime = now - Math.max(ts.getLastCommitTime(), ZERO_TIME); + final long timeMemoryLoad = timeMemoryLoad(memTabletSize, idleTime); + ingestMemory += memTabletSize; + if (minorCompactingSize == 0 && memTabletSize > 0) { + TabletInfo tabletInfo = new TabletInfo(ts.getExtent(), memTabletSize, idleTime, timeMemoryLoad); + largestMemTablets.put(timeMemoryLoad, tabletInfo); + if (idleTime > getMinCIdleThreshold(ts.getExtent())) { + largestIdleMemTablets.put(timeMemoryLoad, tabletInfo); + } + } + + compactionMemory += minorCompactingSize; + if (minorCompactingSize > 0) + numWaitingMincs++; + } + + if (ingestMemory + compactionMemory > maxObserved) { + maxObserved = ingestMemory + compactionMemory; + } + + final long memoryChange = ingestMemory - prevIngestMemory; + prevIngestMemory = ingestMemory; + + boolean startMinC = false; + + if (numWaitingMincs < maxMinCs) { + // based on previous ingest memory increase, if we think that the next increase will + // take us over the threshold for non-compacting memory, then start a minor compaction + // or if the idle time of the chosen tablet is greater than the threshold, start a minor compaction + if (memoryChange >= 0 && ingestMemory + memoryChange > compactionThreshold * maxMemory) { + startMinC = true; + } else if (!largestIdleMemTablets.isEmpty()) { + startMinC = true; + // switch largestMemTablets to largestIdleMemTablets + largestMemTablets = largestIdleMemTablets; + log.debug("IDLE minor compaction chosen"); + } + } + + if (startMinC) { + long toBeCompacted = compactionMemory; - for (int i = numWaitingMincs; i < maxMinCs && !largestMemTablets.isEmpty(); i++) { - Entry<Long,TabletInfo> lastEntry = largestMemTablets.lastEntry(); - TabletInfo largest = lastEntry.getValue(); - toBeCompacted += largest.memTableSize; - result.tabletsToMinorCompact.add(largest.extent); - log.debug(String.format("COMPACTING %s total = %,d ingestMemory = %,d", largest.extent.toString(), (ingestMemory + compactionMemory), ingestMemory)); - log.debug(String.format("chosenMem = %,d chosenIT = %.2f load %,d", largest.memTableSize, largest.idleTime / 1000.0, largest.load)); - largestMemTablets.remove(lastEntry.getKey()); - if (toBeCompacted > ingestMemory * MAX_FLUSH_AT_ONCE_PERCENT) - break; - } ++ outer: ++ for (int i = numWaitingMincs; i < maxMinCs && !largestMemTablets.isEmpty(); /* empty */) { ++ Entry<Long,List<TabletInfo>> lastEntry = largestMemTablets.lastEntry(); ++ for (TabletInfo largest : lastEntry.getValue()) { ++ toBeCompacted += largest.memTableSize; ++ result.tabletsToMinorCompact.add(largest.extent); ++ log.debug(String.format("COMPACTING %s total = %,d ingestMemory = %,d", largest.extent.toString(), (ingestMemory + compactionMemory), ingestMemory)); ++ log.debug(String.format("chosenMem = %,d chosenIT = %.2f load %,d", largest.memTableSize, largest.idleTime / 1000.0, largest.load)); ++ if (toBeCompacted > ingestMemory * MAX_FLUSH_AT_ONCE_PERCENT) ++ break outer; ++ i++; ++ } ++ largestMemTablets.remove(lastEntry.getKey()); ++ } + } else if (memoryChange < 0) { + // before idle mincs, starting a minor compaction meant that memoryChange >= 0. + // we thought we might want to remove the "else" if that changed, + // however it seems performing idle compactions shouldn't make the threshold + // change more often, so it is staying for now. + // also, now we have the case where memoryChange < 0 due to an idle compaction, yet + // we are still adjusting the threshold. should this be tracked and prevented? + + // memory change < 0 means a minor compaction occurred + // we want to see how full the memory got during the compaction + // (the goal is for it to have between 80% and 90% memory utilization) + // and adjust the compactionThreshold accordingly + + log.debug(String.format("BEFORE compactionThreshold = %.3f maxObserved = %,d", compactionThreshold, maxObserved)); + if (compactionThreshold < 0.82 && maxObserved < 0.8 * maxMemory) { + // 0.82 * 1.1 is about 0.9, which is our desired max threshold + compactionThreshold *= 1.1; + } else if (compactionThreshold > 0.056 && maxObserved > 0.9 * maxMemory) { + // 0.056 * 0.9 is about 0.05, which is our desired min threshold + compactionThreshold *= 0.9; + } + maxObserved = 0; + + log.debug(String.format("AFTER compactionThreshold = %.3f", compactionThreshold)); + } + + return result; + } + + @Override + public void tabletClosed(KeyExtent extent) {} + + // The load function: memory times the idle time, doubling every 15 mins + static long timeMemoryLoad(long mem, long time) { + double minutesIdle = time / 60000.0; + + return (long) (mem * Math.pow(2, minutesIdle / 15.0)); + } +}