Updated Branches: refs/heads/master 6b537d5e0 -> f3e3869b9
ACCUMULO-1808 created a compaction strategy with a size limit and fixed some bugs found while testing Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/f3e3869b Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/f3e3869b Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/f3e3869b Branch: refs/heads/master Commit: f3e3869b94d31077b28acae1e7d6ea7fd9297e03 Parents: 6b537d5 Author: Keith Turner <ktur...@apache.org> Authored: Wed Oct 23 17:43:27 2013 -0400 Committer: Keith Turner <ktur...@apache.org> Committed: Wed Oct 23 17:45:13 2013 -0400 ---------------------------------------------------------------------- .../org/apache/accumulo/core/conf/Property.java | 10 ++- .../accumulo/server/tabletserver/Tablet.java | 30 +++++---- .../compaction/MajorCompactionRequest.java | 8 ++- .../compaction/SizeLimitCompactionStrategy.java | 70 ++++++++++++++++++++ .../SizeLimitCompactionStrategyTest.java | 69 +++++++++++++++++++ 5 files changed, 170 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/f3e3869b/core/src/main/java/org/apache/accumulo/core/conf/Property.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index 43ddb18..23bae10 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -534,7 +534,8 @@ public enum Property { } return validTableProperties.contains(key) || key.startsWith(Property.TABLE_CONSTRAINT_PREFIX.getKey()) - || key.startsWith(Property.TABLE_ITERATOR_PREFIX.getKey()) || key.startsWith(Property.TABLE_LOCALITY_GROUP_PREFIX.getKey()); + || key.startsWith(Property.TABLE_ITERATOR_PREFIX.getKey()) || key.startsWith(Property.TABLE_LOCALITY_GROUP_PREFIX.getKey()) + || key.startsWith(Property.TABLE_COMPACTION_STRATEGY_PREFIX.getKey()); } private static final EnumSet<Property> fixedProperties = EnumSet.of(Property.TSERV_CLIENTPORT, Property.TSERV_NATIVEMAP_ENABLED, @@ -552,7 +553,8 @@ public enum Property { // white list prefixes return key.startsWith(Property.TABLE_PREFIX.getKey()) || key.startsWith(Property.TSERV_PREFIX.getKey()) || key.startsWith(Property.LOGGER_PREFIX.getKey()) || key.startsWith(Property.MASTER_PREFIX.getKey()) || key.startsWith(Property.GC_PREFIX.getKey()) - || key.startsWith(Property.MONITOR_PREFIX.getKey() + "banner.") || key.startsWith(VFS_CONTEXT_CLASSPATH_PROPERTY.getKey()); + || key.startsWith(Property.MONITOR_PREFIX.getKey() + "banner.") || key.startsWith(VFS_CONTEXT_CLASSPATH_PROPERTY.getKey()) + || key.startsWith(Property.TABLE_COMPACTION_STRATEGY_PREFIX.getKey()); } public static Property getPropertyByKey(String key) { @@ -592,10 +594,12 @@ public enum Property { public static Map<String,String> getCompactionStrategyOptions(AccumuloConfiguration tableConf) { Map<String,String> longNames = tableConf.getAllPropertiesWithPrefix(Property.TABLE_COMPACTION_STRATEGY_PREFIX); + log.info("longNames " + longNames); Map<String,String> result = new HashMap<String, String>(); for (Entry<String,String> entry : longNames.entrySet()) { - result.put(entry.getKey().substring(0, Property.TABLE_COMPACTION_STRATEGY_PREFIX.getKey().length()), entry.getValue()); + result.put(entry.getKey().substring(Property.TABLE_COMPACTION_STRATEGY_PREFIX.getKey().length()), entry.getValue()); } + log.info("result " + result); return result; } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/f3e3869b/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java index e5ca4d6..c734ff7 100644 --- a/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java +++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java @@ -3034,10 +3034,10 @@ public class Tablet { } - private Map<FileRef,Pair<Key,Key>> getFirstAndLastKeys(MajorCompactionRequest request) throws IOException { + private Map<FileRef,Pair<Key,Key>> getFirstAndLastKeys(SortedMap<FileRef,DataFileValue> allFiles) throws IOException { Map<FileRef,Pair<Key,Key>> result = new HashMap<FileRef,Pair<Key,Key>>(); FileOperations fileFactory = FileOperations.getInstance(); - for (Entry<FileRef,DataFileValue> entry : request.getFiles().entrySet()) { + for (Entry<FileRef,DataFileValue> entry : allFiles.entrySet()) { FileRef file = entry.getKey(); FileSystem ns = fs.getFileSystemByPath(file.path()); FileSKVIterator openReader = fileFactory.openReader(file.path().toString(), true, ns, ns.getConf(), this.getTableConfiguration()); @@ -3111,13 +3111,15 @@ public class Tablet { // acquire file info outside of tablet lock CompactionStrategy strategy = Property.createInstanceFromPropertyName(acuTableConf, Property.TABLE_COMPACTION_STRATEGY, CompactionStrategy.class, new DefaultCompactionStrategy()); strategy.init(Property.getCompactionStrategyOptions(acuTableConf)); - MajorCompactionRequest request = new MajorCompactionRequest(extent, reason, fs, acuTableConf); - request.setFiles(datafileManager.getDatafileSizes()); - strategy.gatherInformation(request); + Map<FileRef,Pair<Key,Key>> firstAndLastKeys = null; if (reason == MajorCompactionReason.CHOP) { - firstAndLastKeys = getFirstAndLastKeys(request); + firstAndLastKeys = getFirstAndLastKeys(datafileManager.getDatafileSizes()); + } else if (reason != MajorCompactionReason.USER) { + MajorCompactionRequest request = new MajorCompactionRequest(extent, reason, fs, acuTableConf); + request.setFiles(datafileManager.getDatafileSizes()); + strategy.gatherInformation(request); } Map<FileRef, DataFileValue> filesToCompact; @@ -3151,14 +3153,16 @@ public class Tablet { // removed by a major compaction cleanUpFiles(fs, fs.listStatus(this.location), false); } - request.setFiles(datafileManager.getDatafileSizes()); + SortedMap<FileRef,DataFileValue> allFiles = datafileManager.getDatafileSizes(); List<FileRef> inputFiles = new ArrayList<FileRef>(); - if (request.getReason() == MajorCompactionReason.CHOP) { + if (reason == MajorCompactionReason.CHOP) { // enforce rules: files with keys outside our range need to be compacted - inputFiles.addAll(findChopFiles(extent, firstAndLastKeys, request.getFiles().keySet())); - } else if (request.getReason() == MajorCompactionReason.USER) { - inputFiles.addAll(request.getFiles().keySet()); + inputFiles.addAll(findChopFiles(extent, firstAndLastKeys, allFiles.keySet())); + } else if (reason == MajorCompactionReason.USER) { + inputFiles.addAll(allFiles.keySet()); } else { + MajorCompactionRequest request = new MajorCompactionRequest(extent, reason, fs, acuTableConf); + request.setFiles(allFiles); plan = strategy.getCompactionPlan(request); if (plan != null) inputFiles.addAll(plan.inputFiles); @@ -3172,9 +3176,9 @@ public class Tablet { droppedFiles.addAll(inputFiles); if (plan != null) droppedFiles.addAll(plan.deleteFiles); - propogateDeletes = !(droppedFiles.equals(request.getFiles().keySet())); + propogateDeletes = !(droppedFiles.equals(allFiles.keySet())); log.debug("Major compaction plan: " + plan + " propogate deletes : " + propogateDeletes); - filesToCompact = new HashMap<FileRef, DataFileValue>(request.getFiles()); + filesToCompact = new HashMap<FileRef,DataFileValue>(allFiles); filesToCompact.keySet().retainAll(inputFiles); t3 = System.currentTimeMillis(); http://git-wip-us.apache.org/repos/asf/accumulo/blob/f3e3869b/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/MajorCompactionRequest.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/MajorCompactionRequest.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/MajorCompactionRequest.java index cde9f29..cadf16d 100644 --- a/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/MajorCompactionRequest.java +++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/MajorCompactionRequest.java @@ -35,7 +35,7 @@ import org.apache.hadoop.fs.Path; /** * Information that can be used to determine how a tablet is to be major compacted, if needed. */ -public class MajorCompactionRequest { +public class MajorCompactionRequest implements Cloneable { final private KeyExtent extent; final private MajorCompactionReason reason; final private VolumeManager volumeManager; @@ -54,6 +54,12 @@ public class MajorCompactionRequest { this.files = Collections.emptyMap(); } + public MajorCompactionRequest(MajorCompactionRequest mcr) { + this(mcr.extent, mcr.reason, mcr.volumeManager, mcr.tableConfig); + // know this is already unmodifiable, no need to wrap again + this.files = mcr.files; + } + public KeyExtent getExtent() { return extent; } http://git-wip-us.apache.org/repos/asf/accumulo/blob/f3e3869b/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/SizeLimitCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/SizeLimitCompactionStrategy.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/SizeLimitCompactionStrategy.java new file mode 100644 index 0000000..f6c62b5 --- /dev/null +++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/SizeLimitCompactionStrategy.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.tabletserver.compaction; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.metadata.schema.DataFileValue; +import org.apache.accumulo.server.fs.FileRef; + +/** + * + */ +public class SizeLimitCompactionStrategy extends DefaultCompactionStrategy { + public static final String SIZE_LIMIT_OPT = "sizeLimit"; + + private long limit; + + @Override + public void init(Map<String,String> options) { + limit = AccumuloConfiguration.getMemoryInBytes(options.get(SIZE_LIMIT_OPT)); + } + + private MajorCompactionRequest filterFiles(MajorCompactionRequest mcr) { + Map<FileRef,DataFileValue> filteredFiles = new HashMap<FileRef,DataFileValue>(); + for (Entry<FileRef,DataFileValue> entry : mcr.getFiles().entrySet()) { + if (entry.getValue().getSize() <= limit) { + filteredFiles.put(entry.getKey(), entry.getValue()); + } + } + + mcr = new MajorCompactionRequest(mcr); + mcr.setFiles(filteredFiles); + + return mcr; + } + + @Override + public boolean shouldCompact(MajorCompactionRequest request) throws IOException { + return super.shouldCompact(filterFiles(request)); + } + + @Override + public void gatherInformation(MajorCompactionRequest request) throws IOException { + super.gatherInformation(filterFiles(request)); + } + + @Override + public CompactionPlan getCompactionPlan(MajorCompactionRequest request) throws IOException { + return super.getCompactionPlan(filterFiles(request)); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/f3e3869b/server/src/test/java/org/apache/accumulo/server/tabletserver/compaction/SizeLimitCompactionStrategyTest.java ---------------------------------------------------------------------- diff --git a/server/src/test/java/org/apache/accumulo/server/tabletserver/compaction/SizeLimitCompactionStrategyTest.java b/server/src/test/java/org/apache/accumulo/server/tabletserver/compaction/SizeLimitCompactionStrategyTest.java new file mode 100644 index 0000000..daeb748 --- /dev/null +++ b/server/src/test/java/org/apache/accumulo/server/tabletserver/compaction/SizeLimitCompactionStrategyTest.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.tabletserver.compaction; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; + +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.data.KeyExtent; +import org.apache.accumulo.core.metadata.schema.DataFileValue; +import org.apache.accumulo.server.fs.FileRef; +import org.apache.hadoop.io.Text; +import org.junit.Assert; +import org.junit.Test; + +/** + * + */ +public class SizeLimitCompactionStrategyTest { + private Map<FileRef,DataFileValue> nfl(String... sa) { + + HashMap<FileRef,DataFileValue> ret = new HashMap<FileRef,DataFileValue>(); + for (int i = 0; i < sa.length; i += 2) { + ret.put(new FileRef(sa[i]), new DataFileValue(AccumuloConfiguration.getMemoryInBytes(sa[i + 1]), 1)); + } + + return ret; + } + + @Test + public void testLimits() throws IOException { + SizeLimitCompactionStrategy slcs = new SizeLimitCompactionStrategy(); + HashMap<String,String> opts = new HashMap<String,String>(); + opts.put(SizeLimitCompactionStrategy.SIZE_LIMIT_OPT, "1G"); + + slcs.init(opts); + + KeyExtent ke = new KeyExtent(new Text("0"), null, null); + MajorCompactionRequest mcr = new MajorCompactionRequest(ke, MajorCompactionReason.NORMAL, null, AccumuloConfiguration.getDefaultConfiguration()); + + mcr.setFiles(nfl("f1", "2G", "f2", "2G", "f3", "2G", "f4", "2G")); + + Assert.assertFalse(slcs.shouldCompact(mcr)); + Assert.assertEquals(0, slcs.getCompactionPlan(mcr).inputFiles.size()); + Assert.assertEquals(4, mcr.getFiles().size()); + + mcr.setFiles(nfl("f1", "2G", "f2", "2G", "f3", "2G", "f4", "2G", "f5", "500M", "f6", "500M", "f7", "500M", "f8", "500M")); + + Assert.assertTrue(slcs.shouldCompact(mcr)); + Assert.assertEquals(nfl("f5", "500M", "f6", "500M", "f7", "500M", "f8", "500M").keySet(), new HashSet<FileRef>(slcs.getCompactionPlan(mcr).inputFiles)); + Assert.assertEquals(8, mcr.getFiles().size()); + } +}