Repository: accumulo Updated Branches: refs/heads/master be4aade67 -> 47a091ade
ACCUMULO-3467 fixed bug with concurrent compactions Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/47a091ad Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/47a091ad Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/47a091ad Branch: refs/heads/master Commit: 47a091adec4a40cdf6852cc0f8c432c15034cb5e Parents: be4aade Author: Keith Turner <ktur...@apache.org> Authored: Thu Jan 15 10:20:52 2015 -0500 Committer: Keith Turner <ktur...@apache.org> Committed: Thu Jan 15 10:20:52 2015 -0500 ---------------------------------------------------------------------- .../core/client/admin/CompactionConfig.java | 9 ++--- .../client/admin/CompactionStrategyConfig.java | 10 ++++++ .../core/client/admin/TableOperations.java | 3 +- .../impl/CompactionStrategyConfigUtil.java | 9 +++++ .../master/tableOps/UserCompactionConfig.java | 1 + .../accumulo/master/tableOps/CompactRange.java | 12 +++++-- .../accumulo/test/UserCompactionStrategyIT.java | 35 ++++++++++++++++++++ 7 files changed, 69 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/47a091ad/core/src/main/java/org/apache/accumulo/core/client/admin/CompactionConfig.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/admin/CompactionConfig.java b/core/src/main/java/org/apache/accumulo/core/client/admin/CompactionConfig.java index 38e5efd..064d836 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/admin/CompactionConfig.java +++ b/core/src/main/java/org/apache/accumulo/core/client/admin/CompactionConfig.java @@ -20,9 +20,9 @@ package org.apache.accumulo.core.client.admin; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Map; import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.impl.CompactionStrategyConfigUtil; import org.apache.hadoop.io.Text; import com.google.common.base.Preconditions; @@ -39,12 +39,7 @@ public class CompactionConfig { private boolean flush = true; private boolean wait = true; private List<IteratorSetting> iterators = Collections.emptyList(); - private CompactionStrategyConfig compactionStrategy = new CompactionStrategyConfig("org.apache.accumulo.tserver.compaction.EverythingCompactionStrategy") { - @Override - public CompactionStrategyConfig setOptions(Map<String,String> opts) { - throw new UnsupportedOperationException(); - } - }; + private CompactionStrategyConfig compactionStrategy = CompactionStrategyConfigUtil.DEFAULT_STRATEGY; /** * @param start http://git-wip-us.apache.org/repos/asf/accumulo/blob/47a091ad/core/src/main/java/org/apache/accumulo/core/client/admin/CompactionStrategyConfig.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/admin/CompactionStrategyConfig.java b/core/src/main/java/org/apache/accumulo/core/client/admin/CompactionStrategyConfig.java index c23b511..0992ba9 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/admin/CompactionStrategyConfig.java +++ b/core/src/main/java/org/apache/accumulo/core/client/admin/CompactionStrategyConfig.java @@ -71,4 +71,14 @@ public class CompactionStrategyConfig { public Map<String,String> getOptions() { return Collections.unmodifiableMap(options); } + + @Override + public boolean equals(Object o) { + if (o instanceof CompactionStrategyConfig) { + CompactionStrategyConfig ocsc = (CompactionStrategyConfig) o; + return className.equals(ocsc.className) && options.equals(ocsc.options); + } + + return false; + } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/47a091ad/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java b/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java index 5c1260c..41021b1 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java +++ b/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java @@ -300,7 +300,8 @@ public interface TableOperations { * @param end * last tablet to be merged contains this row, null means the last tablet in table * @param iterators - * A set of iterators that will be applied to each tablet compacted + * A set of iterators that will be applied to each tablet compacted. If two or more concurrent calls to compact pass iterators, then only one will + * succeed and the others will fail. * @param flush * when true, table memory is flushed before compaction starts * @param wait http://git-wip-us.apache.org/repos/asf/accumulo/blob/47a091ad/core/src/main/java/org/apache/accumulo/core/client/impl/CompactionStrategyConfigUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/CompactionStrategyConfigUtil.java b/core/src/main/java/org/apache/accumulo/core/client/impl/CompactionStrategyConfigUtil.java index 8dce877..758f445 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/CompactionStrategyConfigUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/CompactionStrategyConfigUtil.java @@ -25,12 +25,21 @@ import java.io.DataOutput; import java.io.DataOutputStream; import java.io.IOException; import java.util.HashMap; +import java.util.Map; import java.util.Map.Entry; import org.apache.accumulo.core.client.admin.CompactionStrategyConfig; public class CompactionStrategyConfigUtil { + public static final CompactionStrategyConfig DEFAULT_STRATEGY = new CompactionStrategyConfig( + "org.apache.accumulo.tserver.compaction.EverythingCompactionStrategy") { + @Override + public CompactionStrategyConfig setOptions(Map<String,String> opts) { + throw new UnsupportedOperationException(); + } + }; + private static final int MAGIC = 0xcc5e6024; public static void encode(DataOutput dout, CompactionStrategyConfig csc) throws IOException { http://git-wip-us.apache.org/repos/asf/accumulo/blob/47a091ad/server/base/src/main/java/org/apache/accumulo/server/master/tableOps/UserCompactionConfig.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/tableOps/UserCompactionConfig.java b/server/base/src/main/java/org/apache/accumulo/server/master/tableOps/UserCompactionConfig.java index 98d7fd7..02c6ac3 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/master/tableOps/UserCompactionConfig.java +++ b/server/base/src/main/java/org/apache/accumulo/server/master/tableOps/UserCompactionConfig.java @@ -46,6 +46,7 @@ public class UserCompactionConfig implements Writable { startRow = null; endRow = null; iterators = Collections.emptyList(); + compactionStrategy = CompactionStrategyConfigUtil.DEFAULT_STRATEGY; } @Override http://git-wip-us.apache.org/repos/asf/accumulo/blob/47a091ad/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactRange.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactRange.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactRange.java index fd7decf..580852d 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactRange.java +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactRange.java @@ -31,6 +31,7 @@ import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.RowIterator; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.admin.CompactionStrategyConfig; +import org.apache.accumulo.core.client.impl.CompactionStrategyConfigUtil; import org.apache.accumulo.core.client.impl.Tables; import org.apache.accumulo.core.client.impl.thrift.TableOperation; import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType; @@ -61,6 +62,8 @@ import org.apache.log4j.Logger; import org.apache.thrift.TException; import org.apache.zookeeper.KeeperException.NoNodeException; +import com.google.common.base.Preconditions; + class CompactionDriver extends MasterRepo { private static final long serialVersionUID = 1L; @@ -207,11 +210,16 @@ public class CompactRange extends MasterRepo { public CompactRange(String tableId, byte[] startRow, byte[] endRow, List<IteratorSetting> iterators, CompactionStrategyConfig compactionStrategy) throws ThriftTableOperationException { + + Preconditions.checkNotNull(tableId, "Invalid argument: null tableId"); + Preconditions.checkNotNull(iterators, "Invalid argument: null iterator list"); + Preconditions.checkNotNull(compactionStrategy, "Invalid argument: null compactionStrategy"); + this.tableId = tableId; this.startRow = startRow.length == 0 ? null : startRow; this.endRow = endRow.length == 0 ? null : endRow; - if (iterators.size() > 0 || compactionStrategy != null) { + if (iterators.size() > 0 || !compactionStrategy.equals(CompactionStrategyConfigUtil.DEFAULT_STRATEGY)) { this.config = WritableUtils.toByteArray(new UserCompactionConfig(this.startRow, this.endRow, iterators, compactionStrategy)); } @@ -249,7 +257,7 @@ public class CompactRange extends MasterRepo { continue; // skip self throw new ThriftTableOperationException(tableId, null, TableOperation.COMPACT, TableOperationExceptionType.OTHER, - "Another compaction with iterators is running"); + "Another compaction with iterators and/or a compaction strategy is running"); } StringBuilder encodedIterators = new StringBuilder(); http://git-wip-us.apache.org/repos/asf/accumulo/blob/47a091ad/test/src/test/java/org/apache/accumulo/test/UserCompactionStrategyIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/UserCompactionStrategyIT.java b/test/src/test/java/org/apache/accumulo/test/UserCompactionStrategyIT.java index 5421f52..7a3162b 100644 --- a/test/src/test/java/org/apache/accumulo/test/UserCompactionStrategyIT.java +++ b/test/src/test/java/org/apache/accumulo/test/UserCompactionStrategyIT.java @@ -26,6 +26,7 @@ import java.util.Random; import java.util.Set; import java.util.TreeSet; +import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.BatchWriterConfig; import org.apache.accumulo.core.client.Connector; @@ -44,6 +45,7 @@ import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.harness.AccumuloClusterIT; import org.apache.accumulo.server.fs.FileRef; import org.apache.accumulo.test.functional.FunctionalTestUtils; +import org.apache.accumulo.test.functional.SlowIterator; import org.apache.accumulo.tserver.compaction.CompactionPlan; import org.apache.accumulo.tserver.compaction.CompactionStrategy; import org.apache.accumulo.tserver.compaction.MajorCompactionRequest; @@ -300,6 +302,39 @@ public class UserCompactionStrategyIT extends AccumuloClusterIT { } + @Test + public void testConcurrent() throws Exception { + // two compactions without iterators or strategy should be able to run concurrently + + Connector c = getConnector(); + + String tableName = getUniqueNames(1)[0]; + c.tableOperations().create(tableName); + + // write random data because its very unlikely it will compress + writeRandomValue(c, tableName, 1 << 16); + writeRandomValue(c, tableName, 1 << 16); + + c.tableOperations().compact(tableName, new CompactionConfig().setWait(false)); + c.tableOperations().compact(tableName, new CompactionConfig().setWait(true)); + + Assert.assertEquals(1, FunctionalTestUtils.countRFiles(c, tableName)); + + writeRandomValue(c, tableName, 1 << 16); + + IteratorSetting iterConfig = new IteratorSetting(30, SlowIterator.class); + SlowIterator.setSleepTime(iterConfig, 1000); + + long t1 = System.currentTimeMillis(); + c.tableOperations().compact(tableName, new CompactionConfig().setWait(false).setIterators(Arrays.asList(iterConfig))); + try { + // this compaction should fail because previous one set iterators + c.tableOperations().compact(tableName, new CompactionConfig().setWait(true)); + if (System.currentTimeMillis() - t1 < 2000) + Assert.fail("Expected compaction to fail because another concurrent compaction set iterators"); + } catch (AccumuloException e) {} + } + void writeRandomValue(Connector c, String tableName, int size) throws Exception { Random rand = new Random();