Merge branch '1.5.2-SNAPSHOT' into 1.6.1-SNAPSHOT Conflicts: server/base/src/main/java/org/apache/accumulo/server/tabletserver/LargestFirstMemoryManager.java
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/dd422b96 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/dd422b96 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/dd422b96 Branch: refs/heads/1.6.1-SNAPSHOT Commit: dd422b963c1c372f73114fbbb0f36f954bd91a5f Parents: b604e1d a61a795 Author: Eric C. Newton <eric.new...@gmail.com> Authored: Mon Jun 16 16:49:28 2014 -0400 Committer: Eric C. Newton <eric.new...@gmail.com> Committed: Mon Jun 16 16:49:28 2014 -0400 ---------------------------------------------------------------------- .../tabletserver/LargestFirstMemoryManager.java | 170 +++++++++++-------- 1 file changed, 102 insertions(+), 68 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/dd422b96/server/base/src/main/java/org/apache/accumulo/server/tabletserver/LargestFirstMemoryManager.java ---------------------------------------------------------------------- diff --cc server/base/src/main/java/org/apache/accumulo/server/tabletserver/LargestFirstMemoryManager.java index dd1a6ef,0000000..b891ad6 mode 100644,000000..100644 --- a/server/base/src/main/java/org/apache/accumulo/server/tabletserver/LargestFirstMemoryManager.java +++ b/server/base/src/main/java/org/apache/accumulo/server/tabletserver/LargestFirstMemoryManager.java @@@ -1,200 -1,0 +1,234 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.tabletserver; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; ++import java.util.Map.Entry; ++import java.util.TreeMap; + +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.KeyExtent; +import org.apache.accumulo.server.conf.ServerConfiguration; +import org.apache.hadoop.io.Text; +import org.apache.log4j.Logger; + ++/** ++ * The LargestFirstMemoryManager attempts to keep memory between 80% and 90% full. It adapts over time the point at which it should start a compaction based on ++ * how full memory gets between successive calls. It will also flush idle tablets based on a per-table configurable idle time. It will only attempt to flush ++ * tablets up to 20% of all memory. And, as the name of the class would suggest, it flushes the tablet with the highest memory footprint. However, it actually ++ * chooses the tablet as a function of its size doubled for every 15 minutes of idle time. ++ */ +public class LargestFirstMemoryManager implements MemoryManager { + + private static final Logger log = Logger.getLogger(LargestFirstMemoryManager.class); ++ private static final long ZERO_TIME = System.currentTimeMillis(); + private static final int TSERV_MINC_MAXCONCURRENT_NUMWAITING_MULTIPLIER = 2; ++ private static final double MAX_FLUSH_AT_ONCE_PERCENT = 0.20; + + private long maxMemory = -1; + private int maxConcurrentMincs; + private int numWaitingMultiplier; + private long prevIngestMemory; ++ // The fraction of memory that needs to be used before we begin flushing. + private double compactionThreshold; + private long maxObserved; - private HashMap<Text,Long> mincIdleThresholds; - private static final long zerotime = System.currentTimeMillis(); ++ private final HashMap<Text,Long> mincIdleThresholds = new HashMap<Text,Long>(); + private ServerConfiguration config = null; + ++ private static class TabletInfo { ++ final KeyExtent extent; ++ final long memTableSize; ++ final long idleTime; ++ final long load; ++ ++ public TabletInfo(KeyExtent extent, long memTableSize, long idleTime, long load) { ++ this.extent = extent; ++ this.memTableSize = memTableSize; ++ this.idleTime = idleTime; ++ this.load = load; ++ } ++ } ++ ++ // A little map that will hold the "largest" N tablets, where largest is a result of the timeMemoryLoad function ++ @SuppressWarnings("serial") ++ private static class LargestMap extends TreeMap<Long,TabletInfo> { ++ final int max; ++ ++ LargestMap(int n) { ++ max = n; ++ } ++ ++ @Override ++ public TabletInfo put(Long key, TabletInfo value) { ++ if (size() == max) { ++ if (key.compareTo(this.firstKey()) < 0) ++ return value; ++ try { ++ return super.put(key, value); ++ } finally { ++ super.remove(this.firstKey()); ++ } ++ } else { ++ return super.put(key, value); ++ } ++ } ++ } ++ + LargestFirstMemoryManager(long maxMemory, int maxConcurrentMincs, int numWaitingMultiplier) { + this(); + this.maxMemory = maxMemory; + this.maxConcurrentMincs = maxConcurrentMincs; + this.numWaitingMultiplier = numWaitingMultiplier; + } + + @Override + public void init(ServerConfiguration conf) { + this.config = conf; + maxMemory = conf.getConfiguration().getMemoryInBytes(Property.TSERV_MAXMEM); + maxConcurrentMincs = conf.getConfiguration().getCount(Property.TSERV_MINC_MAXCONCURRENT); + numWaitingMultiplier = TSERV_MINC_MAXCONCURRENT_NUMWAITING_MULTIPLIER; + } + + public LargestFirstMemoryManager() { + prevIngestMemory = 0; + compactionThreshold = 0.5; + maxObserved = 0; - mincIdleThresholds = new HashMap<Text,Long>(); ++ } ++ ++ private long getMinCIdleThreshold(KeyExtent extent) { ++ Text tableId = extent.getTableId(); ++ if (!mincIdleThresholds.containsKey(tableId)) ++ mincIdleThresholds.put(tableId, config.getTableConfiguration(tableId.toString()).getTimeInMillis(Property.TABLE_MINC_COMPACT_IDLETIME)); ++ return mincIdleThresholds.get(tableId); + } + + @Override + public MemoryManagementActions getMemoryManagementActions(List<TabletState> tablets) { + if (maxMemory < 0) - throw new IllegalStateException("need to initialize Largst"); ++ throw new IllegalStateException("need to initialize " + LargestFirstMemoryManager.class.getName()); ++ ++ final int maxMinCs = maxConcurrentMincs * numWaitingMultiplier; ++ + mincIdleThresholds.clear(); ++ final MemoryManagementActions result = new MemoryManagementActions(); ++ result.tabletsToMinorCompact = new ArrayList<KeyExtent>(); ++ ++ TreeMap<Long,TabletInfo> largestMemTablets = new LargestMap(maxMinCs); ++ final TreeMap<Long,TabletInfo> largestIdleMemTablets = new LargestMap(maxConcurrentMincs); ++ final long now = System.currentTimeMillis(); ++ + long ingestMemory = 0; + long compactionMemory = 0; - KeyExtent largestMemTablet = null; - long largestMemTableLoad = 0; - KeyExtent largestIdleMemTablet = null; - long largestIdleMemTableLoad = 0; - long mts; - long mcmts; + int numWaitingMincs = 0; - long idleTime; - long tml; - long ct = System.currentTimeMillis(); - - long largestMemTableIdleTime = -1, largestMemTableSize = -1; - long largestIdleMemTableIdleTime = -1, largestIdleMemTableSize = -1; + ++ // find the largest and most idle tablets + for (TabletState ts : tablets) { - mts = ts.getMemTableSize(); - mcmts = ts.getMinorCompactingMemTableSize(); - if (ts.getLastCommitTime() > 0) - idleTime = ct - ts.getLastCommitTime(); - else - idleTime = ct - zerotime; - ingestMemory += mts; - tml = timeMemoryLoad(mts, idleTime); - if (mcmts == 0 && mts > 0) { - if (tml > largestMemTableLoad) { - largestMemTableLoad = tml; - largestMemTablet = ts.getExtent(); - largestMemTableSize = mts; - largestMemTableIdleTime = idleTime; - } - Text tableId = ts.getExtent().getTableId(); - if (!mincIdleThresholds.containsKey(tableId)) - mincIdleThresholds.put(tableId, config.getTableConfiguration(tableId.toString()).getTimeInMillis(Property.TABLE_MINC_COMPACT_IDLETIME)); - if (idleTime > mincIdleThresholds.get(tableId) && tml > largestIdleMemTableLoad) { - largestIdleMemTableLoad = tml; - largestIdleMemTablet = ts.getExtent(); - largestIdleMemTableSize = mts; - largestIdleMemTableIdleTime = idleTime; ++ final long memTabletSize = ts.getMemTableSize(); ++ final long minorCompactingSize = ts.getMinorCompactingMemTableSize(); ++ final long idleTime = now - Math.max(ts.getLastCommitTime(), ZERO_TIME); ++ final long timeMemoryLoad = timeMemoryLoad(memTabletSize, idleTime); ++ ingestMemory += memTabletSize; ++ if (minorCompactingSize == 0 && memTabletSize > 0) { ++ TabletInfo tabletInfo = new TabletInfo(ts.getExtent(), memTabletSize, idleTime, timeMemoryLoad); ++ largestMemTablets.put(timeMemoryLoad, tabletInfo); ++ if (idleTime > getMinCIdleThreshold(ts.getExtent())) { ++ largestIdleMemTablets.put(timeMemoryLoad, tabletInfo); + } - // log.debug("extent: "+ts.getExtent()+" idle threshold: "+mincIdleThresholds.get(tableId)+" idle time: "+idleTime+" memtable: "+mts+" compacting: "+mcmts); + } - // else { - // log.debug("skipping extent "+ts.getExtent()+", nothing in memory"); - // } + - compactionMemory += mcmts; - if (mcmts > 0) ++ compactionMemory += minorCompactingSize; ++ if (minorCompactingSize > 0) + numWaitingMincs++; + } + + if (ingestMemory + compactionMemory > maxObserved) { + maxObserved = ingestMemory + compactionMemory; + } + - long memoryChange = ingestMemory - prevIngestMemory; ++ final long memoryChange = ingestMemory - prevIngestMemory; + prevIngestMemory = ingestMemory; + - MemoryManagementActions mma = new MemoryManagementActions(); - mma.tabletsToMinorCompact = new ArrayList<KeyExtent>(); - + boolean startMinC = false; + - if (numWaitingMincs < maxConcurrentMincs * numWaitingMultiplier) { ++ if (numWaitingMincs < maxMinCs) { + // based on previous ingest memory increase, if we think that the next increase will + // take us over the threshold for non-compacting memory, then start a minor compaction + // or if the idle time of the chosen tablet is greater than the threshold, start a minor compaction + if (memoryChange >= 0 && ingestMemory + memoryChange > compactionThreshold * maxMemory) { + startMinC = true; - } else if (largestIdleMemTablet != null) { ++ } else if (!largestIdleMemTablets.isEmpty()) { + startMinC = true; - // switch largestMemTablet to largestIdleMemTablet - largestMemTablet = largestIdleMemTablet; - largestMemTableLoad = largestIdleMemTableLoad; - largestMemTableSize = largestIdleMemTableSize; - largestMemTableIdleTime = largestIdleMemTableIdleTime; ++ // switch largestMemTablets to largestIdleMemTablets ++ largestMemTablets = largestIdleMemTablets; + log.debug("IDLE minor compaction chosen"); + } + } + - if (startMinC && largestMemTablet != null) { - mma.tabletsToMinorCompact.add(largestMemTablet); - log.debug(String.format("COMPACTING %s total = %,d ingestMemory = %,d", largestMemTablet.toString(), (ingestMemory + compactionMemory), ingestMemory)); - log.debug(String.format("chosenMem = %,d chosenIT = %.2f load %,d", largestMemTableSize, largestMemTableIdleTime / 1000.0, largestMemTableLoad)); ++ if (startMinC) { ++ long toBeCompacted = compactionMemory; ++ for (int i = numWaitingMincs; i < maxMinCs && !largestMemTablets.isEmpty(); i++) { ++ Entry<Long,TabletInfo> lastEntry = largestMemTablets.lastEntry(); ++ TabletInfo largest = lastEntry.getValue(); ++ toBeCompacted += largest.memTableSize; ++ result.tabletsToMinorCompact.add(largest.extent); ++ log.debug(String.format("COMPACTING %s total = %,d ingestMemory = %,d", largest.extent.toString(), (ingestMemory + compactionMemory), ingestMemory)); ++ log.debug(String.format("chosenMem = %,d chosenIT = %.2f load %,d", largest.memTableSize, largest.idleTime / 1000.0, largest.load)); ++ largestMemTablets.remove(lastEntry.getKey()); ++ if (toBeCompacted > ingestMemory * MAX_FLUSH_AT_ONCE_PERCENT) ++ break; ++ } + } else if (memoryChange < 0) { + // before idle mincs, starting a minor compaction meant that memoryChange >= 0. + // we thought we might want to remove the "else" if that changed, + // however it seems performing idle compactions shouldn't make the threshold + // change more often, so it is staying for now. + // also, now we have the case where memoryChange < 0 due to an idle compaction, yet + // we are still adjusting the threshold. should this be tracked and prevented? + + // memory change < 0 means a minor compaction occurred + // we want to see how full the memory got during the compaction + // (the goal is for it to have between 80% and 90% memory utilization) + // and adjust the compactionThreshold accordingly + + log.debug(String.format("BEFORE compactionThreshold = %.3f maxObserved = %,d", compactionThreshold, maxObserved)); - + if (compactionThreshold < 0.82 && maxObserved < 0.8 * maxMemory) { + // 0.82 * 1.1 is about 0.9, which is our desired max threshold + compactionThreshold *= 1.1; + } else if (compactionThreshold > 0.056 && maxObserved > 0.9 * maxMemory) { + // 0.056 * 0.9 is about 0.05, which is our desired min threshold + compactionThreshold *= 0.9; + } + maxObserved = 0; + + log.debug(String.format("AFTER compactionThreshold = %.3f", compactionThreshold)); + } + - return mma; ++ return result; + } + + @Override + public void tabletClosed(KeyExtent extent) {} + ++ // The load function: memory times the idle time, doubling every 15 mins + static long timeMemoryLoad(long mem, long time) { + double minutesIdle = time / 60000.0; + + return (long) (mem * Math.pow(2, minutesIdle / 15.0)); + } - - public static void main(String[] args) { - for (int i = 0; i < 62; i++) { - System.out.printf("%d\t%d%n", i, timeMemoryLoad(1, i * 60000l)); - } - } +}