Repository: accumulo Updated Branches: refs/heads/master 04774b171 -> 524a81392
ACCUMULO-3134 Enable file selection and output configuration in compact command Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/524a8139 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/524a8139 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/524a8139 Branch: refs/heads/master Commit: 524a813925d69892c21d5db401e76350c00804f9 Parents: 04774b1 Author: ke...@deenlo.com <ke...@deenlo.com> Authored: Tue Dec 9 14:34:33 2014 -0500 Committer: ke...@deenlo.com <ke...@deenlo.com> Committed: Tue Dec 9 14:34:33 2014 -0500 ---------------------------------------------------------------------- .../core/compaction/CompactionSettings.java | 86 +++++++++ .../ConfigurableCompactionStrategy.java | 179 +++++++++++++++++++ .../ConfigurableCompactionStrategyTest.java | 81 +++++++++ .../java/org/apache/accumulo/shell/Shell.java | 1 + .../accumulo/shell/commands/CompactCommand.java | 78 +++++++- .../org/apache/accumulo/test/ShellServerIT.java | 107 +++++++++++ 6 files changed, 529 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/524a8139/core/src/main/java/org/apache/accumulo/core/compaction/CompactionSettings.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/compaction/CompactionSettings.java b/core/src/main/java/org/apache/accumulo/core/compaction/CompactionSettings.java new file mode 100644 index 0000000..a45a692 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/compaction/CompactionSettings.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.accumulo.core.compaction; + +import java.util.Map; +import java.util.regex.Pattern; + +import org.apache.accumulo.core.conf.AccumuloConfiguration; + +import com.google.common.base.Preconditions; + +interface Type { + String convert(String str); +} + +class SizeType implements Type { + @Override + public String convert(String str) { + long size = AccumuloConfiguration.getMemoryInBytes(str); + Preconditions.checkArgument(size > 0); + return Long.toString(size); + } +} + +class PatternType implements Type { + @Override + public String convert(String str) { + // ensure it compiles + Pattern.compile(str); + return str; + } +} + +class UIntType implements Type { + @Override + public String convert(String str) { + Preconditions.checkArgument(Integer.parseInt(str) > 0); + return str; + } +} + +class StringType implements Type { + @Override + public String convert(String str) { + return str; + } +} + +public enum CompactionSettings { + + SF_GT_ESIZE_OPT(new SizeType()), + SF_LT_ESIZE_OPT(new SizeType()), + SF_NAME_RE_OPT(new PatternType()), + SF_PATH_RE_OPT(new PatternType()), + MIN_FILES_OPT(new UIntType()), + OUTPUT_COMPRESSION_OPT(new StringType()), + OUTPUT_BLOCK_SIZE_OPT(new SizeType()), + OUTPUT_HDFS_BLOCK_SIZE_OPT(new SizeType()), + OUTPUT_INDEX_BLOCK_SIZE_OPT(new SizeType()), + OUTPUT_REPLICATION_OPT(new UIntType()); + + private Type type; + + private CompactionSettings(Type type) { + this.type = type; + } + + public void put(Map<String,String> options, String val) { + options.put(name(), type.convert(val)); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/524a8139/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/strategies/ConfigurableCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/strategies/ConfigurableCompactionStrategy.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/strategies/ConfigurableCompactionStrategy.java new file mode 100644 index 0000000..ba3ea42 --- /dev/null +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/strategies/ConfigurableCompactionStrategy.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.accumulo.tserver.compaction.strategies; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.regex.Pattern; + +import org.apache.accumulo.core.compaction.CompactionSettings; +import org.apache.accumulo.core.metadata.schema.DataFileValue; +import org.apache.accumulo.server.fs.FileRef; +import org.apache.accumulo.tserver.compaction.CompactionPlan; +import org.apache.accumulo.tserver.compaction.CompactionStrategy; +import org.apache.accumulo.tserver.compaction.MajorCompactionRequest; +import org.apache.accumulo.tserver.compaction.WriteParameters; +import org.apache.hadoop.fs.Path; + +public class ConfigurableCompactionStrategy extends CompactionStrategy { + + private static interface Test { + boolean shouldCompact(Entry<FileRef,DataFileValue> file, MajorCompactionRequest request); + } + + private static abstract class FileSizeTest implements Test { + private final long esize; + + private FileSizeTest(String s) { + this.esize = Long.parseLong(s); + } + + @Override + public boolean shouldCompact(Entry<FileRef,DataFileValue> file, MajorCompactionRequest request) { + return shouldCompact(file.getValue().getSize(), esize); + } + + public abstract boolean shouldCompact(long fsize, long esize); + } + + private static abstract class PatternPathTest implements Test { + private Pattern pattern; + + private PatternPathTest(String p) { + this.pattern = Pattern.compile(p); + } + + @Override + public boolean shouldCompact(Entry<FileRef,DataFileValue> file, MajorCompactionRequest request) { + return pattern.matcher(getInput(file.getKey().path())).matches(); + } + + public abstract String getInput(Path path); + + } + + private List<Test> tests = new ArrayList<>(); + private boolean andTest = true; + private int minFiles = 1; + private WriteParameters writeParams = new WriteParameters(); + + public void init(Map<String,String> options) { + + Set<Entry<String,String>> es = options.entrySet(); + for (Entry<String,String> entry : es) { + + switch (CompactionSettings.valueOf(entry.getKey())) { + case SF_LT_ESIZE_OPT: + tests.add(new FileSizeTest(entry.getValue()) { + @Override + public boolean shouldCompact(long fsize, long esize) { + return fsize < esize; + } + }); + break; + case SF_GT_ESIZE_OPT: + tests.add(new FileSizeTest(entry.getValue()) { + @Override + public boolean shouldCompact(long fsize, long esize) { + return fsize > esize; + } + }); + break; + case SF_NAME_RE_OPT: + tests.add(new PatternPathTest(entry.getValue()) { + @Override + public String getInput(Path path) { + return path.getName(); + } + }); + break; + case SF_PATH_RE_OPT: + tests.add(new PatternPathTest(entry.getValue()) { + @Override + public String getInput(Path path) { + return path.toString(); + } + }); + break; + case MIN_FILES_OPT: + minFiles = Integer.parseInt(entry.getValue()); + break; + case OUTPUT_COMPRESSION_OPT: + writeParams.setCompressType(entry.getValue()); + break; + case OUTPUT_BLOCK_SIZE_OPT: + writeParams.setBlockSize(Long.parseLong(entry.getValue())); + break; + case OUTPUT_INDEX_BLOCK_SIZE_OPT: + writeParams.setIndexBlockSize(Long.parseLong(entry.getValue())); + break; + case OUTPUT_HDFS_BLOCK_SIZE_OPT: + writeParams.setHdfsBlockSize(Long.parseLong(entry.getValue())); + break; + case OUTPUT_REPLICATION_OPT: + writeParams.setReplication(Integer.parseInt(entry.getValue())); + break; + default: + throw new IllegalArgumentException("Unknown option " + entry.getKey()); + } + } + } + + private List<FileRef> getFilesToCompact(MajorCompactionRequest request) { + List<FileRef> filesToCompact = new ArrayList<>(); + + for (Entry<FileRef,DataFileValue> entry : request.getFiles().entrySet()) { + boolean compact = false; + for (Test test : tests) { + if (andTest) { + compact = test.shouldCompact(entry, request); + if (!compact) + break; + } else { + compact |= test.shouldCompact(entry, request); + } + } + + if (compact || tests.isEmpty()) + filesToCompact.add(entry.getKey()); + } + return filesToCompact; + } + + @Override + public boolean shouldCompact(MajorCompactionRequest request) throws IOException { + return getFilesToCompact(request).size() >= minFiles; + } + + @Override + public CompactionPlan getCompactionPlan(MajorCompactionRequest request) throws IOException { + List<FileRef> filesToCompact = getFilesToCompact(request); + if (filesToCompact.size() >= minFiles) { + CompactionPlan plan = new CompactionPlan(); + plan.inputFiles.addAll(filesToCompact); + plan.writeParameters = writeParams; + + return plan; + } + return null; + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/524a8139/server/tserver/src/test/java/org/apache/accumulo/tserver/compaction/strategies/ConfigurableCompactionStrategyTest.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/compaction/strategies/ConfigurableCompactionStrategyTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/compaction/strategies/ConfigurableCompactionStrategyTest.java new file mode 100644 index 0000000..a896537 --- /dev/null +++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/compaction/strategies/ConfigurableCompactionStrategyTest.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.accumulo.tserver.compaction.strategies; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.accumulo.core.compaction.CompactionSettings; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.data.KeyExtent; +import org.apache.accumulo.core.metadata.schema.DataFileValue; +import org.apache.accumulo.server.fs.FileRef; +import org.apache.accumulo.tserver.compaction.CompactionPlan; +import org.apache.accumulo.tserver.compaction.MajorCompactionReason; +import org.apache.accumulo.tserver.compaction.MajorCompactionRequest; +import org.apache.hadoop.io.Text; +import org.junit.Assert; +import org.junit.Test; + +public class ConfigurableCompactionStrategyTest { + + // file selection options are adequately tested by ShellServerIT + + @Test + public void testOutputOptions() throws Exception { + MajorCompactionRequest mcr = new MajorCompactionRequest(new KeyExtent(new Text("1"), null, null), MajorCompactionReason.USER, null, null); + + Map<FileRef,DataFileValue> files = new HashMap<>(); + files.put(new FileRef("hdfs://nn1/accumulo/tables/1/t-009/F00001.rf"), new DataFileValue(50000, 400)); + mcr.setFiles(files); + + // test setting no output options + ConfigurableCompactionStrategy ccs = new ConfigurableCompactionStrategy(); + + Map<String,String> opts = new HashMap<>(); + ccs.init(opts); + + CompactionPlan plan = ccs.getCompactionPlan(mcr); + + Assert.assertEquals(0, plan.writeParameters.getBlockSize()); + Assert.assertEquals(0, plan.writeParameters.getHdfsBlockSize()); + Assert.assertEquals(0, plan.writeParameters.getIndexBlockSize()); + Assert.assertEquals(0, plan.writeParameters.getReplication()); + Assert.assertEquals(null, plan.writeParameters.getCompressType()); + + // test setting all output options + ccs = new ConfigurableCompactionStrategy(); + + CompactionSettings.OUTPUT_BLOCK_SIZE_OPT.put(opts, "64K"); + CompactionSettings.OUTPUT_COMPRESSION_OPT.put(opts, "snappy"); + CompactionSettings.OUTPUT_HDFS_BLOCK_SIZE_OPT.put(opts, "256M"); + CompactionSettings.OUTPUT_INDEX_BLOCK_SIZE_OPT.put(opts, "32K"); + CompactionSettings.OUTPUT_REPLICATION_OPT.put(opts, "5"); + + ccs.init(opts); + + plan = ccs.getCompactionPlan(mcr); + + Assert.assertEquals(AccumuloConfiguration.getMemoryInBytes("64K"), plan.writeParameters.getBlockSize()); + Assert.assertEquals(AccumuloConfiguration.getMemoryInBytes("256M"), plan.writeParameters.getHdfsBlockSize()); + Assert.assertEquals(AccumuloConfiguration.getMemoryInBytes("32K"), plan.writeParameters.getIndexBlockSize()); + Assert.assertEquals(5, plan.writeParameters.getReplication()); + Assert.assertEquals("snappy", plan.writeParameters.getCompressType()); + + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/524a8139/shell/src/main/java/org/apache/accumulo/shell/Shell.java ---------------------------------------------------------------------- diff --git a/shell/src/main/java/org/apache/accumulo/shell/Shell.java b/shell/src/main/java/org/apache/accumulo/shell/Shell.java index 8927ee0..8aadd68 100644 --- a/shell/src/main/java/org/apache/accumulo/shell/Shell.java +++ b/shell/src/main/java/org/apache/accumulo/shell/Shell.java @@ -1104,6 +1104,7 @@ public class Shell extends ShellOptions { public static final void setDebugging(boolean debuggingEnabled) { Logger.getLogger(Constants.CORE_PACKAGE_NAME).setLevel(debuggingEnabled ? Level.TRACE : Level.INFO); + Logger.getLogger(Shell.class.getPackage().getName()).setLevel(debuggingEnabled ? Level.TRACE : Level.INFO); } public static final boolean isDebuggingEnabled() { http://git-wip-us.apache.org/repos/asf/accumulo/blob/524a8139/shell/src/main/java/org/apache/accumulo/shell/commands/CompactCommand.java ---------------------------------------------------------------------- diff --git a/shell/src/main/java/org/apache/accumulo/shell/commands/CompactCommand.java b/shell/src/main/java/org/apache/accumulo/shell/commands/CompactCommand.java index 660630e..9e599ae 100644 --- a/shell/src/main/java/org/apache/accumulo/shell/commands/CompactCommand.java +++ b/shell/src/main/java/org/apache/accumulo/shell/commands/CompactCommand.java @@ -27,6 +27,7 @@ import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.admin.CompactionConfig; import org.apache.accumulo.core.client.admin.CompactionStrategyConfig; +import org.apache.accumulo.core.compaction.CompactionSettings; import org.apache.accumulo.shell.Shell; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; @@ -35,6 +36,10 @@ import org.apache.commons.cli.Options; public class CompactCommand extends TableOperation { private Option noFlushOption, waitOpt, profileOpt, cancelOpt, strategyOpt, strategyConfigOpt; + // file selection and file output options + private Option enameOption, epathOption, sizeLtOption, sizeGtOption, minFilesOption, outBlockSizeOpt, outHdfsBlockSizeOpt, outIndexBlockSizeOpt, + outCompressionOpt, outReplication; + private CompactionConfig compactionConfig = null; boolean override = false; @@ -43,7 +48,9 @@ public class CompactCommand extends TableOperation { @Override public String description() { - return "sets all tablets for a table to major compact as soon as possible (based on current time)"; + return "Initiates a major compaction on tablets within the specified range that have one or more files. If no file selection options are specified, then " + + "all files will be compacted. Options that configure output settings are only applied to this compaction and not later compactions. If multiple " + + "concurrent user initiated compactions specify iterators or a compaction strategy, then all but one will fail to start."; } protected void doTableOp(final Shell shellState, final String tableName) throws AccumuloException, AccumuloSecurityException { @@ -70,7 +77,29 @@ public class CompactCommand extends TableOperation { } } } - + + private void put(CommandLine cl, Map<String,String> opts, Option opt, CompactionSettings setting) { + if (cl.hasOption(opt.getLongOpt())) + setting.put(opts, cl.getOptionValue(opt.getLongOpt())); + } + + private Map<String,String> getConfigurableCompactionStrategyOpts(CommandLine cl) { + Map<String,String> opts = new HashMap<>(); + + put(cl, opts, enameOption, CompactionSettings.SF_NAME_RE_OPT); + put(cl, opts, epathOption, CompactionSettings.SF_PATH_RE_OPT); + put(cl, opts, sizeLtOption, CompactionSettings.SF_LT_ESIZE_OPT); + put(cl, opts, sizeGtOption, CompactionSettings.SF_GT_ESIZE_OPT); + put(cl, opts, minFilesOption, CompactionSettings.MIN_FILES_OPT); + put(cl, opts, outCompressionOpt, CompactionSettings.OUTPUT_COMPRESSION_OPT); + put(cl, opts, outBlockSizeOpt, CompactionSettings.OUTPUT_BLOCK_SIZE_OPT); + put(cl, opts, outHdfsBlockSizeOpt, CompactionSettings.OUTPUT_HDFS_BLOCK_SIZE_OPT); + put(cl, opts, outIndexBlockSizeOpt, CompactionSettings.OUTPUT_INDEX_BLOCK_SIZE_OPT); + put(cl, opts, outReplication, CompactionSettings.OUTPUT_REPLICATION_OPT); + + return opts; + } + @Override public int execute(final String fullCommand, final CommandLine cl, final Shell shellState) throws Exception { @@ -101,7 +130,12 @@ public class CompactCommand extends TableOperation { compactionConfig.setIterators(new ArrayList<>(iterators)); } + Map<String,String> configurableCompactOpt = getConfigurableCompactionStrategyOpts(cl); + if (cl.hasOption(strategyOpt.getOpt())) { + if (configurableCompactOpt.size() > 0) + throw new IllegalArgumentException("Can not specify compaction strategy with file selection and file output options."); + CompactionStrategyConfig csc = new CompactionStrategyConfig(cl.getOptionValue(strategyOpt.getOpt())); if (cl.hasOption(strategyConfigOpt.getOpt())) { Map<String,String> props = new HashMap<>(); @@ -117,9 +151,19 @@ public class CompactCommand extends TableOperation { compactionConfig.setCompactionStrategy(csc); } + if (configurableCompactOpt.size() > 0) { + CompactionStrategyConfig csc = new CompactionStrategyConfig("org.apache.accumulo.tserver.compaction.strategies.ConfigurableCompactionStrategy"); + csc.setOptions(configurableCompactOpt); + compactionConfig.setCompactionStrategy(csc); + } + return super.execute(fullCommand, cl, shellState); } + private Option newLAO(String lopt, String desc) { + return new Option(null, lopt, true, desc); + } + @Override public Options getOptions() { final Options opts = super.getOptions(); @@ -131,7 +175,7 @@ public class CompactCommand extends TableOperation { waitOpt = new Option("w", "wait", false, "wait for compact to finish"); opts.addOption(waitOpt); - profileOpt = new Option("pn", "profile", true, "iterator profile name"); + profileOpt = new Option("pn", "profile", true, "Iterator profile name."); profileOpt.setArgName("profile"); opts.addOption(profileOpt); @@ -143,6 +187,34 @@ public class CompactCommand extends TableOperation { cancelOpt = new Option(null, "cancel", false, "cancel user initiated compactions"); opts.addOption(cancelOpt); + enameOption = newLAO("sf-ename", "Select files using regular expression to match file names. Only matches against last part of path."); + opts.addOption(enameOption); + epathOption = newLAO("sf-epath", "Select files using regular expression to match file paths to compact. Matches against full path."); + opts.addOption(epathOption); + sizeLtOption = newLAO("sf-lt-esize", + "Selects files less than specified size. Uses the estimated size of file in metadata table. Can use K,M, and G suffixes"); + opts.addOption(sizeLtOption); + sizeGtOption = newLAO("sf-gt-esize", + "Selects files greater than specified size. Uses the estimated size of file in metadata table. Can use K,M, and G suffixes"); + opts.addOption(sizeGtOption); + minFilesOption = newLAO("min-files", + "Only compacts if at least the specified number of files are selected. When no file selection criteria are given, all files are selected."); + opts.addOption(minFilesOption); + outBlockSizeOpt = newLAO("out-data-bs", + "Rfile data block size to use for compaction output file. Can use K,M, and G suffixes. Uses table settings if not specified."); + opts.addOption(outBlockSizeOpt); + outHdfsBlockSizeOpt = newLAO("out-hdfs-bs", + "HDFS block size to use for compaction output file. Can use K,M, and G suffixes. Uses table settings if not specified."); + opts.addOption(outHdfsBlockSizeOpt); + outIndexBlockSizeOpt = newLAO("out-index-bs", + "Rfile index block size to use for compaction output file. Can use K,M, and G suffixes. Uses table settings if not specified."); + opts.addOption(outIndexBlockSizeOpt); + outCompressionOpt = newLAO("out-compress", + "Compression to use for compaction output file. Either snappy, gz, lzo, or none. Uses table settings if not specified."); + opts.addOption(outCompressionOpt); + outReplication = newLAO("out-replication", "HDFS replication to use for compaction output file. Uses table settings if not specified."); + opts.addOption(outReplication); + return opts; } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/524a8139/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java b/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java index d878c7f..e4104ce 100644 --- a/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java +++ b/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java @@ -35,6 +35,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Random; import jline.console.ConsoleReader; @@ -753,6 +754,112 @@ public class ShellServerIT extends SharedMiniClusterIT { } @Test + public void testCompactionSelection() throws Exception { + final String table = name.getMethodName(); + final String clone = table + "_clone"; + + ts.exec("createtable " + table); + ts.exec("insert a b c d"); + ts.exec("flush -w"); + ts.exec("insert x y z v"); + ts.exec("flush -w"); + + ts.exec("clonetable -s " + Property.TABLE_MAJC_RATIO.getKey() + "=10 " + table + " " + clone); + + ts.exec("table " + clone); + ts.exec("insert m n l o"); + ts.exec("flush -w"); + + String tableId = getTableId(table); + String cloneId = getTableId(clone); + + assertEquals(3, countFiles(cloneId)); + + // compact only files from src table + ts.exec("compact -t " + clone + " -w --sf-epath .*tables/" + tableId + ".*"); + + assertEquals(2, countFiles(cloneId)); + + ts.exec("insert r s t u"); + ts.exec("flush -w"); + + assertEquals(3, countFiles(cloneId)); + + // compact all flush files + ts.exec("compact -t " + clone + " -w --sf-ename F.*"); + + assertEquals(2, countFiles(cloneId)); + + // create two large files + Random rand = new Random(); + StringBuilder sb = new StringBuilder("insert b v q "); + for (int i = 0; i < 10000; i++) { + sb.append('a' + rand.nextInt(26)); + } + + ts.exec(sb.toString()); + ts.exec("flush -w"); + + ts.exec(sb.toString()); + ts.exec("flush -w"); + + assertEquals(4, countFiles(cloneId)); + + // compact only small files + ts.exec("compact -t " + clone + " -w --sf-lt-esize 1000"); + + assertEquals(3, countFiles(cloneId)); + + // compact large files if 3 or more + ts.exec("compact -t " + clone + " -w --sf-gt-esize 1K --min-files 3"); + + assertEquals(3, countFiles(cloneId)); + + // compact large files if 2 or more + ts.exec("compact -t " + clone + " -w --sf-gt-esize 1K --min-files 2"); + + assertEquals(2, countFiles(cloneId)); + + // compact if tablet has 3 or more files + ts.exec("compact -t " + clone + " -w --min-files 3"); + + assertEquals(2, countFiles(cloneId)); + + // compact if tablet has 2 or more files + ts.exec("compact -t " + clone + " -w --min-files 2"); + + assertEquals(1, countFiles(cloneId)); + + // create two small and one large flush files in order to test AND + ts.exec(sb.toString()); + ts.exec("flush -w"); + + ts.exec("insert m n l o"); + ts.exec("flush -w"); + + ts.exec("insert m n l o"); + ts.exec("flush -w"); + + assertEquals(4, countFiles(cloneId)); + + // should only compact two small flush files leaving large flush file + ts.exec("compact -t " + clone + " -w --sf-ename F.* --sf-lt-esize 1K"); + + assertEquals(3, countFiles(cloneId)); + } + + @Test + public void testCompactionSelectionAndStrategy() throws Exception { + + final String table = name.getMethodName(); + + ts.exec("createtable " + table); + + // expect this to fail + ts.exec("compact -t " + table + " -w --sf-ename F.* -s " + TestCompactionStrategy.class.getName() + " -sc inputPrefix=F,dropPrefix=A", false); + } + + @Test public void constraint() throws Exception { final String table = name.getMethodName();