Repository: accumulo
Updated Branches:
  refs/heads/1.6.1-SNAPSHOT b604e1d0f -> dd422b963


ACCUMULO-2905 start multiple MinCs, starting with the largest

ACCUMULO-2905 updates based on [~elserj]'s review

ACCUMULO-2905 improved custom map

ACCUMULO-2905 more review feedback, formatted

ACCUMULO-2905 schedule fewer idle compactions

ACCUMULO-2905 improve javadocs based on rb comments

Conflicts:

        
server/src/main/java/org/apache/accumulo/server/tabletserver/LargestFirstMemoryManager.java


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/a61a7951
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/a61a7951
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/a61a7951

Branch: refs/heads/1.6.1-SNAPSHOT
Commit: a61a795191f5c35fe7e74cdabaad0c129e291fdb
Parents: e6171e6
Author: Eric C. Newton <eric.new...@gmail.com>
Authored: Mon Jun 16 16:48:03 2014 -0400
Committer: Eric C. Newton <eric.new...@gmail.com>
Committed: Mon Jun 16 16:48:03 2014 -0400

----------------------------------------------------------------------
 .../tabletserver/LargestFirstMemoryManager.java | 171 +++++++++++--------
 1 file changed, 103 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/a61a7951/server/src/main/java/org/apache/accumulo/server/tabletserver/LargestFirstMemoryManager.java
----------------------------------------------------------------------
diff --git 
a/server/src/main/java/org/apache/accumulo/server/tabletserver/LargestFirstMemoryManager.java
 
b/server/src/main/java/org/apache/accumulo/server/tabletserver/LargestFirstMemoryManager.java
index 1a30b31..42f10ff 100644
--- 
a/server/src/main/java/org/apache/accumulo/server/tabletserver/LargestFirstMemoryManager.java
+++ 
b/server/src/main/java/org/apache/accumulo/server/tabletserver/LargestFirstMemoryManager.java
@@ -19,6 +19,8 @@ package org.apache.accumulo.server.tabletserver;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map.Entry;
+import java.util.TreeMap;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.conf.Property;
@@ -27,20 +29,68 @@ import org.apache.accumulo.server.conf.ServerConfiguration;
 import org.apache.hadoop.io.Text;
 import org.apache.log4j.Logger;
 
+/**
+ * The LargestFirstMemoryManager attempts to keep memory between 80% and 90% 
full. It adapts over time the point at which it should start a compaction based 
on
+ * how full memory gets between successive calls. It will also flush idle 
tablets based on a per-table configurable idle time. It will only attempt to 
flush
+ * tablets up to 20% of all memory. And, as the name of the class would 
suggest, it flushes the tablet with the highest memory footprint. However, it 
actually
+ * chooses the tablet as a function of its size doubled for every 15 minutes 
of idle time. 
+ */
 public class LargestFirstMemoryManager implements MemoryManager {
   
   private static final Logger log = 
Logger.getLogger(LargestFirstMemoryManager.class);
+  private static final long ZERO_TIME = System.currentTimeMillis();
+  private static final int TSERV_MINC_MAXCONCURRENT_NUMWAITING_MULTIPLIER = 2;
+  private static final double MAX_FLUSH_AT_ONCE_PERCENT = 0.20;
   
   private long maxMemory = -1;
   private int maxConcurrentMincs;
   private int numWaitingMultiplier;
   private long prevIngestMemory;
+  // The fraction of memory that needs to be used before we begin flushing.
   private double compactionThreshold;
   private long maxObserved;
-  private HashMap<Text,Long> mincIdleThresholds;
-  private static final long zerotime = System.currentTimeMillis();
+  private final HashMap<Text,Long> mincIdleThresholds = new 
HashMap<Text,Long>();
   private ServerConfiguration config = null;
   
+  private static class TabletInfo {
+    final KeyExtent extent;
+    final long memTableSize;
+    final long idleTime;
+    final long load;
+    
+    public TabletInfo(KeyExtent extent, long memTableSize, long idleTime, long 
load) {
+      this.extent = extent;
+      this.memTableSize = memTableSize;
+      this.idleTime = idleTime;
+      this.load = load;
+    }
+  }
+  
+  // A little map that will hold the "largest" N tablets, where largest is a 
result of the timeMemoryLoad function
+  @SuppressWarnings("serial")
+  private static class LargestMap extends TreeMap<Long,TabletInfo> {
+    final int max;
+    
+    LargestMap(int n) {
+      max = n;
+    }
+    
+    @Override
+    public TabletInfo put(Long key, TabletInfo value) {
+      if (size() == max) {
+        if (key.compareTo(this.firstKey()) < 0)
+          return value;
+        try {
+          return super.put(key, value);
+        } finally {
+          super.remove(this.firstKey());
+        }
+      } else {
+        return super.put(key, value);
+      }
+    }
+  }
+  
   LargestFirstMemoryManager(long maxMemory, int maxConcurrentMincs, int 
numWaitingMultiplier) {
     this();
     this.maxMemory = maxMemory;
@@ -59,63 +109,51 @@ public class LargestFirstMemoryManager implements 
MemoryManager {
     prevIngestMemory = 0;
     compactionThreshold = 0.5;
     maxObserved = 0;
-    mincIdleThresholds = new HashMap<Text,Long>();
+  }
+  
+  private long getMinCIdleThreshold(KeyExtent extent) {
+    Text tableId = extent.getTableId();
+    if (!mincIdleThresholds.containsKey(tableId))
+      mincIdleThresholds.put(tableId, 
config.getTableConfiguration(tableId.toString()).getTimeInMillis(Property.TABLE_MINC_COMPACT_IDLETIME));
+    return mincIdleThresholds.get(tableId);
   }
   
   @Override
   public MemoryManagementActions getMemoryManagementActions(List<TabletState> 
tablets) {
     if (maxMemory < 0)
-      throw new IllegalStateException("need to initialize Largst");
+      throw new IllegalStateException("need to initialize " + 
LargestFirstMemoryManager.class.getName());
+    
+    final int maxMinCs = maxConcurrentMincs * numWaitingMultiplier;
+    
     mincIdleThresholds.clear();
+    final MemoryManagementActions result = new MemoryManagementActions();
+    result.tabletsToMinorCompact = new ArrayList<KeyExtent>();
+    
+    TreeMap<Long,TabletInfo> largestMemTablets = new LargestMap(maxMinCs);
+    final TreeMap<Long,TabletInfo> largestIdleMemTablets = new 
LargestMap(maxConcurrentMincs);
+    final long now = System.currentTimeMillis();
+    
     long ingestMemory = 0;
     long compactionMemory = 0;
-    KeyExtent largestMemTablet = null;
-    long largestMemTableLoad = 0;
-    KeyExtent largestIdleMemTablet = null;
-    long largestIdleMemTableLoad = 0;
-    long mts;
-    long mcmts;
     int numWaitingMincs = 0;
-    long idleTime;
-    long tml;
-    long ct = System.currentTimeMillis();
-    
-    long largestMemTableIdleTime = -1, largestMemTableSize = -1;
-    long largestIdleMemTableIdleTime = -1, largestIdleMemTableSize = -1;
     
+    // find the largest and most idle tablets
     for (TabletState ts : tablets) {
-      mts = ts.getMemTableSize();
-      mcmts = ts.getMinorCompactingMemTableSize();
-      if (ts.getLastCommitTime() > 0)
-        idleTime = ct - ts.getLastCommitTime();
-      else
-        idleTime = ct - zerotime;
-      ingestMemory += mts;
-      tml = timeMemoryLoad(mts, idleTime);
-      if (mcmts == 0 && mts > 0) {
-        if (tml > largestMemTableLoad) {
-          largestMemTableLoad = tml;
-          largestMemTablet = ts.getExtent();
-          largestMemTableSize = mts;
-          largestMemTableIdleTime = idleTime;
-        }
-        Text tableId = ts.getExtent().getTableId();
-        if (!mincIdleThresholds.containsKey(tableId))
-          mincIdleThresholds.put(tableId, 
config.getTableConfiguration(tableId.toString()).getTimeInMillis(Property.TABLE_MINC_COMPACT_IDLETIME));
-        if (idleTime > mincIdleThresholds.get(tableId) && tml > 
largestIdleMemTableLoad) {
-          largestIdleMemTableLoad = tml;
-          largestIdleMemTablet = ts.getExtent();
-          largestIdleMemTableSize = mts;
-          largestIdleMemTableIdleTime = idleTime;
+      final long memTabletSize = ts.getMemTableSize();
+      final long minorCompactingSize = ts.getMinorCompactingMemTableSize();
+      final long idleTime = now - Math.max(ts.getLastCommitTime(), ZERO_TIME);
+      final long timeMemoryLoad = timeMemoryLoad(memTabletSize, idleTime);
+      ingestMemory += memTabletSize;
+      if (minorCompactingSize == 0 && memTabletSize > 0) {
+        TabletInfo tabletInfo = new TabletInfo(ts.getExtent(), memTabletSize, 
idleTime, timeMemoryLoad);
+        largestMemTablets.put(timeMemoryLoad, tabletInfo);
+        if (idleTime > getMinCIdleThreshold(ts.getExtent())) {
+          largestIdleMemTablets.put(timeMemoryLoad, tabletInfo);
         }
-        // log.debug("extent: "+ts.getExtent()+" idle threshold: 
"+mincIdleThresholds.get(tableId)+" idle time: "+idleTime+" memtable: "+mts+" 
compacting: "+mcmts);
       }
-      // else {
-      // log.debug("skipping extent "+ts.getExtent()+", nothing in memory");
-      // }
       
-      compactionMemory += mcmts;
-      if (mcmts > 0)
+      compactionMemory += minorCompactingSize;
+      if (minorCompactingSize > 0)
         numWaitingMincs++;
     }
     
@@ -123,35 +161,38 @@ public class LargestFirstMemoryManager implements 
MemoryManager {
       maxObserved = ingestMemory + compactionMemory;
     }
     
-    long memoryChange = ingestMemory - prevIngestMemory;
+    final long memoryChange = ingestMemory - prevIngestMemory;
     prevIngestMemory = ingestMemory;
     
-    MemoryManagementActions mma = new MemoryManagementActions();
-    mma.tabletsToMinorCompact = new ArrayList<KeyExtent>();
-    
     boolean startMinC = false;
     
-    if (numWaitingMincs < maxConcurrentMincs * numWaitingMultiplier) {
+    if (numWaitingMincs < maxMinCs) {
       // based on previous ingest memory increase, if we think that the next 
increase will
       // take us over the threshold for non-compacting memory, then start a 
minor compaction
       // or if the idle time of the chosen tablet is greater than the 
threshold, start a minor compaction
       if (memoryChange >= 0 && ingestMemory + memoryChange > 
compactionThreshold * maxMemory) {
         startMinC = true;
-      } else if (largestIdleMemTablet != null) {
+      } else if (!largestIdleMemTablets.isEmpty()) {
         startMinC = true;
-        // switch largestMemTablet to largestIdleMemTablet
-        largestMemTablet = largestIdleMemTablet;
-        largestMemTableLoad = largestIdleMemTableLoad;
-        largestMemTableSize = largestIdleMemTableSize;
-        largestMemTableIdleTime = largestIdleMemTableIdleTime;
+        // switch largestMemTablets to largestIdleMemTablets
+        largestMemTablets = largestIdleMemTablets;
         log.debug("IDLE minor compaction chosen");
       }
     }
     
-    if (startMinC && largestMemTablet != null) {
-      mma.tabletsToMinorCompact.add(largestMemTablet);
-      log.debug(String.format("COMPACTING %s  total = %,d ingestMemory = %,d", 
largestMemTablet.toString(), (ingestMemory + compactionMemory), ingestMemory));
-      log.debug(String.format("chosenMem = %,d chosenIT = %.2f load %,d", 
largestMemTableSize, largestMemTableIdleTime / 1000.0, largestMemTableLoad));
+    if (startMinC) {
+      long toBeCompacted = compactionMemory;
+      for (int i = numWaitingMincs; i < maxMinCs && 
!largestMemTablets.isEmpty(); i++) {
+        Entry<Long,TabletInfo> lastEntry = largestMemTablets.lastEntry();
+        TabletInfo largest = lastEntry.getValue();
+        toBeCompacted += largest.memTableSize;
+        result.tabletsToMinorCompact.add(largest.extent);
+        log.debug(String.format("COMPACTING %s  total = %,d ingestMemory = 
%,d", largest.extent.toString(), (ingestMemory + compactionMemory), 
ingestMemory));
+        log.debug(String.format("chosenMem = %,d chosenIT = %.2f load %,d", 
largest.memTableSize, largest.idleTime / 1000.0, largest.load));
+        largestMemTablets.remove(lastEntry.getKey());
+        if (toBeCompacted > ingestMemory * MAX_FLUSH_AT_ONCE_PERCENT)
+          break;
+      }
     } else if (memoryChange < 0) {
       // before idle mincs, starting a minor compaction meant that 
memoryChange >= 0.
       // we thought we might want to remove the "else" if that changed,
@@ -166,7 +207,6 @@ public class LargestFirstMemoryManager implements 
MemoryManager {
       // and adjust the compactionThreshold accordingly
       
       log.debug(String.format("BEFORE compactionThreshold = %.3f maxObserved = 
%,d", compactionThreshold, maxObserved));
-      
       if (compactionThreshold < 0.82 && maxObserved < 0.8 * maxMemory) {
         // 0.82 * 1.1 is about 0.9, which is our desired max threshold
         compactionThreshold *= 1.1;
@@ -179,21 +219,16 @@ public class LargestFirstMemoryManager implements 
MemoryManager {
       log.debug(String.format("AFTER compactionThreshold = %.3f", 
compactionThreshold));
     }
     
-    return mma;
+    return result;
   }
   
   @Override
   public void tabletClosed(KeyExtent extent) {}
   
+  // The load function: memory times the idle time, doubling every 15 mins
   static long timeMemoryLoad(long mem, long time) {
     double minutesIdle = time / 60000.0;
     
     return (long) (mem * Math.pow(2, minutesIdle / 15.0));
   }
-  
-  public static void main(String[] args) {
-    for (int i = 0; i < 62; i++) {
-      System.out.printf("%d\t%d%n", i, timeMemoryLoad(1, i * 60000l));
-    }
-  }
 }

Reply via email to