Updated Branches:
  refs/heads/master 6b537d5e0 -> f3e3869b9

ACCUMULO-1808 created a compaction strategy with a size limit and fixed some 
bugs found while testing


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/f3e3869b
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/f3e3869b
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/f3e3869b

Branch: refs/heads/master
Commit: f3e3869b94d31077b28acae1e7d6ea7fd9297e03
Parents: 6b537d5
Author: Keith Turner <ktur...@apache.org>
Authored: Wed Oct 23 17:43:27 2013 -0400
Committer: Keith Turner <ktur...@apache.org>
Committed: Wed Oct 23 17:45:13 2013 -0400

----------------------------------------------------------------------
 .../org/apache/accumulo/core/conf/Property.java | 10 ++-
 .../accumulo/server/tabletserver/Tablet.java    | 30 +++++----
 .../compaction/MajorCompactionRequest.java      |  8 ++-
 .../compaction/SizeLimitCompactionStrategy.java | 70 ++++++++++++++++++++
 .../SizeLimitCompactionStrategyTest.java        | 69 +++++++++++++++++++
 5 files changed, 170 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/f3e3869b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java 
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 43ddb18..23bae10 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -534,7 +534,8 @@ public enum Property {
     }
 
     return validTableProperties.contains(key) || 
key.startsWith(Property.TABLE_CONSTRAINT_PREFIX.getKey())
-        || key.startsWith(Property.TABLE_ITERATOR_PREFIX.getKey()) || 
key.startsWith(Property.TABLE_LOCALITY_GROUP_PREFIX.getKey());
+        || key.startsWith(Property.TABLE_ITERATOR_PREFIX.getKey()) || 
key.startsWith(Property.TABLE_LOCALITY_GROUP_PREFIX.getKey())
+        || key.startsWith(Property.TABLE_COMPACTION_STRATEGY_PREFIX.getKey());
   }
 
   private static final EnumSet<Property> fixedProperties = 
EnumSet.of(Property.TSERV_CLIENTPORT, Property.TSERV_NATIVEMAP_ENABLED,
@@ -552,7 +553,8 @@ public enum Property {
     // white list prefixes
     return key.startsWith(Property.TABLE_PREFIX.getKey()) || 
key.startsWith(Property.TSERV_PREFIX.getKey()) || 
key.startsWith(Property.LOGGER_PREFIX.getKey())
         || key.startsWith(Property.MASTER_PREFIX.getKey()) || 
key.startsWith(Property.GC_PREFIX.getKey())
-        || key.startsWith(Property.MONITOR_PREFIX.getKey() + "banner.") || 
key.startsWith(VFS_CONTEXT_CLASSPATH_PROPERTY.getKey());
+        || key.startsWith(Property.MONITOR_PREFIX.getKey() + "banner.") || 
key.startsWith(VFS_CONTEXT_CLASSPATH_PROPERTY.getKey())
+        || key.startsWith(Property.TABLE_COMPACTION_STRATEGY_PREFIX.getKey());
   }
 
   public static Property getPropertyByKey(String key) {
@@ -592,10 +594,12 @@ public enum Property {
 
   public static Map<String,String> 
getCompactionStrategyOptions(AccumuloConfiguration tableConf) {
     Map<String,String> longNames = 
tableConf.getAllPropertiesWithPrefix(Property.TABLE_COMPACTION_STRATEGY_PREFIX);
+    log.info("longNames " + longNames);
     Map<String,String> result = new HashMap<String, String>();
     for (Entry<String,String> entry : longNames.entrySet()) {
-      result.put(entry.getKey().substring(0, 
Property.TABLE_COMPACTION_STRATEGY_PREFIX.getKey().length()), entry.getValue());
+      
result.put(entry.getKey().substring(Property.TABLE_COMPACTION_STRATEGY_PREFIX.getKey().length()),
 entry.getValue());
     }
+    log.info("result " + result);
     return result;
   }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f3e3869b/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
----------------------------------------------------------------------
diff --git 
a/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java 
b/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
index e5ca4d6..c734ff7 100644
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
+++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
@@ -3034,10 +3034,10 @@ public class Tablet {
   }
   
   
- private Map<FileRef,Pair<Key,Key>> getFirstAndLastKeys(MajorCompactionRequest 
request) throws IOException {
+  private Map<FileRef,Pair<Key,Key>> 
getFirstAndLastKeys(SortedMap<FileRef,DataFileValue> allFiles) throws 
IOException {
     Map<FileRef,Pair<Key,Key>> result = new HashMap<FileRef,Pair<Key,Key>>();
     FileOperations fileFactory = FileOperations.getInstance();
-    for (Entry<FileRef,DataFileValue> entry : request.getFiles().entrySet()) {
+    for (Entry<FileRef,DataFileValue> entry : allFiles.entrySet()) {
       FileRef file = entry.getKey();
       FileSystem ns = fs.getFileSystemByPath(file.path());
       FileSKVIterator openReader = 
fileFactory.openReader(file.path().toString(), true, ns, ns.getConf(), 
this.getTableConfiguration());
@@ -3111,13 +3111,15 @@ public class Tablet {
     // acquire file info outside of tablet lock
     CompactionStrategy strategy  = 
Property.createInstanceFromPropertyName(acuTableConf, 
Property.TABLE_COMPACTION_STRATEGY, CompactionStrategy.class, new 
DefaultCompactionStrategy());
     strategy.init(Property.getCompactionStrategyOptions(acuTableConf));
-    MajorCompactionRequest request = new MajorCompactionRequest(extent, 
reason, fs, acuTableConf);
-    request.setFiles(datafileManager.getDatafileSizes());
-    strategy.gatherInformation(request);
+
     
     Map<FileRef,Pair<Key,Key>> firstAndLastKeys = null;
     if (reason == MajorCompactionReason.CHOP) {
-        firstAndLastKeys = getFirstAndLastKeys(request);
+      firstAndLastKeys = 
getFirstAndLastKeys(datafileManager.getDatafileSizes());
+    } else if (reason != MajorCompactionReason.USER) {
+      MajorCompactionRequest request = new MajorCompactionRequest(extent, 
reason, fs, acuTableConf);
+      request.setFiles(datafileManager.getDatafileSizes());
+      strategy.gatherInformation(request);
     }
     
     Map<FileRef, DataFileValue> filesToCompact;
@@ -3151,14 +3153,16 @@ public class Tablet {
         // removed by a major compaction
         cleanUpFiles(fs, fs.listStatus(this.location), false);
       }
-      request.setFiles(datafileManager.getDatafileSizes());
+      SortedMap<FileRef,DataFileValue> allFiles = 
datafileManager.getDatafileSizes();
       List<FileRef> inputFiles = new ArrayList<FileRef>();
-      if (request.getReason() == MajorCompactionReason.CHOP) {
+      if (reason == MajorCompactionReason.CHOP) {
         // enforce rules: files with keys outside our range need to be 
compacted
-        inputFiles.addAll(findChopFiles(extent, firstAndLastKeys, 
request.getFiles().keySet()));
-      } else if (request.getReason() == MajorCompactionReason.USER) {
-        inputFiles.addAll(request.getFiles().keySet());
+        inputFiles.addAll(findChopFiles(extent, firstAndLastKeys, 
allFiles.keySet()));
+      } else if (reason == MajorCompactionReason.USER) {
+        inputFiles.addAll(allFiles.keySet());
       } else {
+        MajorCompactionRequest request = new MajorCompactionRequest(extent, 
reason, fs, acuTableConf);
+        request.setFiles(allFiles);
         plan = strategy.getCompactionPlan(request);
         if (plan != null)
           inputFiles.addAll(plan.inputFiles);
@@ -3172,9 +3176,9 @@ public class Tablet {
       droppedFiles.addAll(inputFiles);
       if (plan != null)
         droppedFiles.addAll(plan.deleteFiles);
-      propogateDeletes = !(droppedFiles.equals(request.getFiles().keySet()));
+      propogateDeletes = !(droppedFiles.equals(allFiles.keySet()));
       log.debug("Major compaction plan: " + plan + " propogate deletes : " + 
propogateDeletes);
-      filesToCompact = new HashMap<FileRef, DataFileValue>(request.getFiles());
+      filesToCompact = new HashMap<FileRef,DataFileValue>(allFiles);
       filesToCompact.keySet().retainAll(inputFiles);
       
       t3 = System.currentTimeMillis();

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f3e3869b/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/MajorCompactionRequest.java
----------------------------------------------------------------------
diff --git 
a/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/MajorCompactionRequest.java
 
b/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/MajorCompactionRequest.java
index cde9f29..cadf16d 100644
--- 
a/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/MajorCompactionRequest.java
+++ 
b/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/MajorCompactionRequest.java
@@ -35,7 +35,7 @@ import org.apache.hadoop.fs.Path;
 /**
  * Information that can be used to determine how a tablet is to be major 
compacted, if needed.
  */
-public class MajorCompactionRequest {
+public class MajorCompactionRequest implements Cloneable {
   final private KeyExtent extent;
   final private MajorCompactionReason reason;
   final private VolumeManager volumeManager;
@@ -54,6 +54,12 @@ public class MajorCompactionRequest {
     this.files = Collections.emptyMap();
   }
   
+  public MajorCompactionRequest(MajorCompactionRequest mcr) {
+    this(mcr.extent, mcr.reason, mcr.volumeManager, mcr.tableConfig);
+    // know this is already unmodifiable, no need to wrap again
+    this.files = mcr.files;
+  }
+
   public KeyExtent getExtent() {
     return extent;
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f3e3869b/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/SizeLimitCompactionStrategy.java
----------------------------------------------------------------------
diff --git 
a/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/SizeLimitCompactionStrategy.java
 
b/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/SizeLimitCompactionStrategy.java
new file mode 100644
index 0000000..f6c62b5
--- /dev/null
+++ 
b/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/SizeLimitCompactionStrategy.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.tabletserver.compaction;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import org.apache.accumulo.server.fs.FileRef;
+
+/**
+ * 
+ */
+public class SizeLimitCompactionStrategy extends DefaultCompactionStrategy {
+  public static final String SIZE_LIMIT_OPT = "sizeLimit";
+  
+  private long limit;
+  
+  @Override
+  public void init(Map<String,String> options) {
+    limit = 
AccumuloConfiguration.getMemoryInBytes(options.get(SIZE_LIMIT_OPT));
+  }
+  
+  private MajorCompactionRequest filterFiles(MajorCompactionRequest mcr) {
+    Map<FileRef,DataFileValue> filteredFiles = new 
HashMap<FileRef,DataFileValue>();
+    for (Entry<FileRef,DataFileValue> entry : mcr.getFiles().entrySet()) {
+      if (entry.getValue().getSize() <= limit) {
+        filteredFiles.put(entry.getKey(), entry.getValue());
+      }
+    }
+    
+    mcr = new MajorCompactionRequest(mcr);
+    mcr.setFiles(filteredFiles);
+    
+    return mcr;
+  }
+  
+  @Override
+  public boolean shouldCompact(MajorCompactionRequest request) throws 
IOException {
+    return super.shouldCompact(filterFiles(request));
+  }
+  
+  @Override
+  public void gatherInformation(MajorCompactionRequest request) throws 
IOException {
+    super.gatherInformation(filterFiles(request));
+  }
+  
+  @Override
+  public CompactionPlan getCompactionPlan(MajorCompactionRequest request) 
throws IOException {
+    return super.getCompactionPlan(filterFiles(request));
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f3e3869b/server/src/test/java/org/apache/accumulo/server/tabletserver/compaction/SizeLimitCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git 
a/server/src/test/java/org/apache/accumulo/server/tabletserver/compaction/SizeLimitCompactionStrategyTest.java
 
b/server/src/test/java/org/apache/accumulo/server/tabletserver/compaction/SizeLimitCompactionStrategyTest.java
new file mode 100644
index 0000000..daeb748
--- /dev/null
+++ 
b/server/src/test/java/org/apache/accumulo/server/tabletserver/compaction/SizeLimitCompactionStrategyTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.tabletserver.compaction;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import org.apache.accumulo.server.fs.FileRef;
+import org.apache.hadoop.io.Text;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * 
+ */
+public class SizeLimitCompactionStrategyTest {
+  private Map<FileRef,DataFileValue> nfl(String... sa) {
+
+    HashMap<FileRef,DataFileValue> ret = new HashMap<FileRef,DataFileValue>();
+    for (int i = 0; i < sa.length; i += 2) {
+      ret.put(new FileRef(sa[i]), new 
DataFileValue(AccumuloConfiguration.getMemoryInBytes(sa[i + 1]), 1));
+    }
+
+    return ret;
+  }
+
+  @Test
+  public void testLimits() throws IOException {
+    SizeLimitCompactionStrategy slcs = new SizeLimitCompactionStrategy();
+    HashMap<String,String> opts = new HashMap<String,String>();
+    opts.put(SizeLimitCompactionStrategy.SIZE_LIMIT_OPT, "1G");
+
+    slcs.init(opts);
+
+    KeyExtent ke = new KeyExtent(new Text("0"), null, null);
+    MajorCompactionRequest mcr = new MajorCompactionRequest(ke, 
MajorCompactionReason.NORMAL, null, 
AccumuloConfiguration.getDefaultConfiguration());
+
+    mcr.setFiles(nfl("f1", "2G", "f2", "2G", "f3", "2G", "f4", "2G"));
+
+    Assert.assertFalse(slcs.shouldCompact(mcr));
+    Assert.assertEquals(0, slcs.getCompactionPlan(mcr).inputFiles.size());
+    Assert.assertEquals(4, mcr.getFiles().size());
+
+    mcr.setFiles(nfl("f1", "2G", "f2", "2G", "f3", "2G", "f4", "2G", "f5", 
"500M", "f6", "500M", "f7", "500M", "f8", "500M"));
+
+    Assert.assertTrue(slcs.shouldCompact(mcr));
+    Assert.assertEquals(nfl("f5", "500M", "f6", "500M", "f7", "500M", "f8", 
"500M").keySet(), new HashSet<FileRef>(slcs.getCompactionPlan(mcr).inputFiles));
+    Assert.assertEquals(8, mcr.getFiles().size());
+  }
+}

Reply via email to