Merge branch '1.5.2-SNAPSHOT' into 1.6.1-SNAPSHOT

Conflicts:
        
server/base/src/main/java/org/apache/accumulo/server/tabletserver/LargestFirstMemoryManager.java


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/dd422b96
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/dd422b96
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/dd422b96

Branch: refs/heads/1.6.1-SNAPSHOT
Commit: dd422b963c1c372f73114fbbb0f36f954bd91a5f
Parents: b604e1d a61a795
Author: Eric C. Newton <eric.new...@gmail.com>
Authored: Mon Jun 16 16:49:28 2014 -0400
Committer: Eric C. Newton <eric.new...@gmail.com>
Committed: Mon Jun 16 16:49:28 2014 -0400

----------------------------------------------------------------------
 .../tabletserver/LargestFirstMemoryManager.java | 170 +++++++++++--------
 1 file changed, 102 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/dd422b96/server/base/src/main/java/org/apache/accumulo/server/tabletserver/LargestFirstMemoryManager.java
----------------------------------------------------------------------
diff --cc 
server/base/src/main/java/org/apache/accumulo/server/tabletserver/LargestFirstMemoryManager.java
index dd1a6ef,0000000..b891ad6
mode 100644,000000..100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/tabletserver/LargestFirstMemoryManager.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/tabletserver/LargestFirstMemoryManager.java
@@@ -1,200 -1,0 +1,234 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.server.tabletserver;
 +
 +import java.util.ArrayList;
 +import java.util.HashMap;
 +import java.util.List;
++import java.util.Map.Entry;
++import java.util.TreeMap;
 +
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.KeyExtent;
 +import org.apache.accumulo.server.conf.ServerConfiguration;
 +import org.apache.hadoop.io.Text;
 +import org.apache.log4j.Logger;
 +
++/**
++ * The LargestFirstMemoryManager attempts to keep memory between 80% and 90% 
full. It adapts over time the point at which it should start a compaction based 
on
++ * how full memory gets between successive calls. It will also flush idle 
tablets based on a per-table configurable idle time. It will only attempt to 
flush
++ * tablets up to 20% of all memory. And, as the name of the class would 
suggest, it flushes the tablet with the highest memory footprint. However, it 
actually
++ * chooses the tablet as a function of its size doubled for every 15 minutes 
of idle time. 
++ */
 +public class LargestFirstMemoryManager implements MemoryManager {
 +  
 +  private static final Logger log = 
Logger.getLogger(LargestFirstMemoryManager.class);
++  private static final long ZERO_TIME = System.currentTimeMillis();
 +  private static final int TSERV_MINC_MAXCONCURRENT_NUMWAITING_MULTIPLIER = 2;
++  private static final double MAX_FLUSH_AT_ONCE_PERCENT = 0.20;
 +  
 +  private long maxMemory = -1;
 +  private int maxConcurrentMincs;
 +  private int numWaitingMultiplier;
 +  private long prevIngestMemory;
++  // The fraction of memory that needs to be used before we begin flushing.
 +  private double compactionThreshold;
 +  private long maxObserved;
-   private HashMap<Text,Long> mincIdleThresholds;
-   private static final long zerotime = System.currentTimeMillis();
++  private final HashMap<Text,Long> mincIdleThresholds = new 
HashMap<Text,Long>();
 +  private ServerConfiguration config = null;
 +  
++  private static class TabletInfo {
++    final KeyExtent extent;
++    final long memTableSize;
++    final long idleTime;
++    final long load;
++    
++    public TabletInfo(KeyExtent extent, long memTableSize, long idleTime, 
long load) {
++      this.extent = extent;
++      this.memTableSize = memTableSize;
++      this.idleTime = idleTime;
++      this.load = load;
++    }
++  }
++  
++  // A little map that will hold the "largest" N tablets, where largest is a 
result of the timeMemoryLoad function
++  @SuppressWarnings("serial")
++  private static class LargestMap extends TreeMap<Long,TabletInfo> {
++    final int max;
++    
++    LargestMap(int n) {
++      max = n;
++    }
++    
++    @Override
++    public TabletInfo put(Long key, TabletInfo value) {
++      if (size() == max) {
++        if (key.compareTo(this.firstKey()) < 0)
++          return value;
++        try {
++          return super.put(key, value);
++        } finally {
++          super.remove(this.firstKey());
++        }
++      } else {
++        return super.put(key, value);
++      }
++    }
++  }
++  
 +  LargestFirstMemoryManager(long maxMemory, int maxConcurrentMincs, int 
numWaitingMultiplier) {
 +    this();
 +    this.maxMemory = maxMemory;
 +    this.maxConcurrentMincs = maxConcurrentMincs;
 +    this.numWaitingMultiplier = numWaitingMultiplier;
 +  }
 +  
 +  @Override
 +  public void init(ServerConfiguration conf) {
 +    this.config = conf;
 +    maxMemory = 
conf.getConfiguration().getMemoryInBytes(Property.TSERV_MAXMEM);
 +    maxConcurrentMincs = 
conf.getConfiguration().getCount(Property.TSERV_MINC_MAXCONCURRENT);
 +    numWaitingMultiplier = TSERV_MINC_MAXCONCURRENT_NUMWAITING_MULTIPLIER;
 +  }
 +  
 +  public LargestFirstMemoryManager() {
 +    prevIngestMemory = 0;
 +    compactionThreshold = 0.5;
 +    maxObserved = 0;
-     mincIdleThresholds = new HashMap<Text,Long>();
++  }
++  
++  private long getMinCIdleThreshold(KeyExtent extent) {
++    Text tableId = extent.getTableId();
++    if (!mincIdleThresholds.containsKey(tableId))
++      mincIdleThresholds.put(tableId, 
config.getTableConfiguration(tableId.toString()).getTimeInMillis(Property.TABLE_MINC_COMPACT_IDLETIME));
++    return mincIdleThresholds.get(tableId);
 +  }
 +  
 +  @Override
 +  public MemoryManagementActions getMemoryManagementActions(List<TabletState> 
tablets) {
 +    if (maxMemory < 0)
-       throw new IllegalStateException("need to initialize Largst");
++      throw new IllegalStateException("need to initialize " + 
LargestFirstMemoryManager.class.getName());
++    
++    final int maxMinCs = maxConcurrentMincs * numWaitingMultiplier;
++    
 +    mincIdleThresholds.clear();
++    final MemoryManagementActions result = new MemoryManagementActions();
++    result.tabletsToMinorCompact = new ArrayList<KeyExtent>();
++    
++    TreeMap<Long,TabletInfo> largestMemTablets = new LargestMap(maxMinCs);
++    final TreeMap<Long,TabletInfo> largestIdleMemTablets = new 
LargestMap(maxConcurrentMincs);
++    final long now = System.currentTimeMillis();
++    
 +    long ingestMemory = 0;
 +    long compactionMemory = 0;
-     KeyExtent largestMemTablet = null;
-     long largestMemTableLoad = 0;
-     KeyExtent largestIdleMemTablet = null;
-     long largestIdleMemTableLoad = 0;
-     long mts;
-     long mcmts;
 +    int numWaitingMincs = 0;
-     long idleTime;
-     long tml;
-     long ct = System.currentTimeMillis();
-     
-     long largestMemTableIdleTime = -1, largestMemTableSize = -1;
-     long largestIdleMemTableIdleTime = -1, largestIdleMemTableSize = -1;
 +    
++    // find the largest and most idle tablets
 +    for (TabletState ts : tablets) {
-       mts = ts.getMemTableSize();
-       mcmts = ts.getMinorCompactingMemTableSize();
-       if (ts.getLastCommitTime() > 0)
-         idleTime = ct - ts.getLastCommitTime();
-       else
-         idleTime = ct - zerotime;
-       ingestMemory += mts;
-       tml = timeMemoryLoad(mts, idleTime);
-       if (mcmts == 0 && mts > 0) {
-         if (tml > largestMemTableLoad) {
-           largestMemTableLoad = tml;
-           largestMemTablet = ts.getExtent();
-           largestMemTableSize = mts;
-           largestMemTableIdleTime = idleTime;
-         }
-         Text tableId = ts.getExtent().getTableId();
-         if (!mincIdleThresholds.containsKey(tableId))
-           mincIdleThresholds.put(tableId, 
config.getTableConfiguration(tableId.toString()).getTimeInMillis(Property.TABLE_MINC_COMPACT_IDLETIME));
-         if (idleTime > mincIdleThresholds.get(tableId) && tml > 
largestIdleMemTableLoad) {
-           largestIdleMemTableLoad = tml;
-           largestIdleMemTablet = ts.getExtent();
-           largestIdleMemTableSize = mts;
-           largestIdleMemTableIdleTime = idleTime;
++      final long memTabletSize = ts.getMemTableSize();
++      final long minorCompactingSize = ts.getMinorCompactingMemTableSize();
++      final long idleTime = now - Math.max(ts.getLastCommitTime(), ZERO_TIME);
++      final long timeMemoryLoad = timeMemoryLoad(memTabletSize, idleTime);
++      ingestMemory += memTabletSize;
++      if (minorCompactingSize == 0 && memTabletSize > 0) {
++        TabletInfo tabletInfo = new TabletInfo(ts.getExtent(), memTabletSize, 
idleTime, timeMemoryLoad);
++        largestMemTablets.put(timeMemoryLoad, tabletInfo);
++        if (idleTime > getMinCIdleThreshold(ts.getExtent())) {
++          largestIdleMemTablets.put(timeMemoryLoad, tabletInfo);
 +        }
-         // log.debug("extent: "+ts.getExtent()+" idle threshold: 
"+mincIdleThresholds.get(tableId)+" idle time: "+idleTime+" memtable: "+mts+" 
compacting: "+mcmts);
 +      }
-       // else {
-       // log.debug("skipping extent "+ts.getExtent()+", nothing in memory");
-       // }
 +      
-       compactionMemory += mcmts;
-       if (mcmts > 0)
++      compactionMemory += minorCompactingSize;
++      if (minorCompactingSize > 0)
 +        numWaitingMincs++;
 +    }
 +    
 +    if (ingestMemory + compactionMemory > maxObserved) {
 +      maxObserved = ingestMemory + compactionMemory;
 +    }
 +    
-     long memoryChange = ingestMemory - prevIngestMemory;
++    final long memoryChange = ingestMemory - prevIngestMemory;
 +    prevIngestMemory = ingestMemory;
 +    
-     MemoryManagementActions mma = new MemoryManagementActions();
-     mma.tabletsToMinorCompact = new ArrayList<KeyExtent>();
-     
 +    boolean startMinC = false;
 +    
-     if (numWaitingMincs < maxConcurrentMincs * numWaitingMultiplier) {
++    if (numWaitingMincs < maxMinCs) {
 +      // based on previous ingest memory increase, if we think that the next 
increase will
 +      // take us over the threshold for non-compacting memory, then start a 
minor compaction
 +      // or if the idle time of the chosen tablet is greater than the 
threshold, start a minor compaction
 +      if (memoryChange >= 0 && ingestMemory + memoryChange > 
compactionThreshold * maxMemory) {
 +        startMinC = true;
-       } else if (largestIdleMemTablet != null) {
++      } else if (!largestIdleMemTablets.isEmpty()) {
 +        startMinC = true;
-         // switch largestMemTablet to largestIdleMemTablet
-         largestMemTablet = largestIdleMemTablet;
-         largestMemTableLoad = largestIdleMemTableLoad;
-         largestMemTableSize = largestIdleMemTableSize;
-         largestMemTableIdleTime = largestIdleMemTableIdleTime;
++        // switch largestMemTablets to largestIdleMemTablets
++        largestMemTablets = largestIdleMemTablets;
 +        log.debug("IDLE minor compaction chosen");
 +      }
 +    }
 +    
-     if (startMinC && largestMemTablet != null) {
-       mma.tabletsToMinorCompact.add(largestMemTablet);
-       log.debug(String.format("COMPACTING %s  total = %,d ingestMemory = 
%,d", largestMemTablet.toString(), (ingestMemory + compactionMemory), 
ingestMemory));
-       log.debug(String.format("chosenMem = %,d chosenIT = %.2f load %,d", 
largestMemTableSize, largestMemTableIdleTime / 1000.0, largestMemTableLoad));
++    if (startMinC) {
++      long toBeCompacted = compactionMemory;
++      for (int i = numWaitingMincs; i < maxMinCs && 
!largestMemTablets.isEmpty(); i++) {
++        Entry<Long,TabletInfo> lastEntry = largestMemTablets.lastEntry();
++        TabletInfo largest = lastEntry.getValue();
++        toBeCompacted += largest.memTableSize;
++        result.tabletsToMinorCompact.add(largest.extent);
++        log.debug(String.format("COMPACTING %s  total = %,d ingestMemory = 
%,d", largest.extent.toString(), (ingestMemory + compactionMemory), 
ingestMemory));
++        log.debug(String.format("chosenMem = %,d chosenIT = %.2f load %,d", 
largest.memTableSize, largest.idleTime / 1000.0, largest.load));
++        largestMemTablets.remove(lastEntry.getKey());
++        if (toBeCompacted > ingestMemory * MAX_FLUSH_AT_ONCE_PERCENT)
++          break;
++      }
 +    } else if (memoryChange < 0) {
 +      // before idle mincs, starting a minor compaction meant that 
memoryChange >= 0.
 +      // we thought we might want to remove the "else" if that changed,
 +      // however it seems performing idle compactions shouldn't make the 
threshold
 +      // change more often, so it is staying for now.
 +      // also, now we have the case where memoryChange < 0 due to an idle 
compaction, yet
 +      // we are still adjusting the threshold. should this be tracked and 
prevented?
 +      
 +      // memory change < 0 means a minor compaction occurred
 +      // we want to see how full the memory got during the compaction
 +      // (the goal is for it to have between 80% and 90% memory utilization)
 +      // and adjust the compactionThreshold accordingly
 +      
 +      log.debug(String.format("BEFORE compactionThreshold = %.3f maxObserved 
= %,d", compactionThreshold, maxObserved));
-       
 +      if (compactionThreshold < 0.82 && maxObserved < 0.8 * maxMemory) {
 +        // 0.82 * 1.1 is about 0.9, which is our desired max threshold
 +        compactionThreshold *= 1.1;
 +      } else if (compactionThreshold > 0.056 && maxObserved > 0.9 * 
maxMemory) {
 +        // 0.056 * 0.9 is about 0.05, which is our desired min threshold
 +        compactionThreshold *= 0.9;
 +      }
 +      maxObserved = 0;
 +      
 +      log.debug(String.format("AFTER compactionThreshold = %.3f", 
compactionThreshold));
 +    }
 +    
-     return mma;
++    return result;
 +  }
 +  
 +  @Override
 +  public void tabletClosed(KeyExtent extent) {}
 +  
++  // The load function: memory times the idle time, doubling every 15 mins
 +  static long timeMemoryLoad(long mem, long time) {
 +    double minutesIdle = time / 60000.0;
 +    
 +    return (long) (mem * Math.pow(2, minutesIdle / 15.0));
 +  }
-   
-   public static void main(String[] args) {
-     for (int i = 0; i < 62; i++) {
-       System.out.printf("%d\t%d%n", i, timeMemoryLoad(1, i * 60000l));
-     }
-   }
 +}

Reply via email to