Updated Branches: refs/heads/master 8ec4cb840 -> bcaefcd2e
ACCUMULO-1451 from feedback from [~kturner], removed concept of passes, multiple output files, made interface abstract class, moved rules for USER and CHOP into tablet Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/c16b6ae4 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/c16b6ae4 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/c16b6ae4 Branch: refs/heads/master Commit: c16b6ae4fc7e607011c1ae7c01f3ea3bd0c182ba Parents: 3e74ee6 Author: Eric Newton <eric.new...@gmail.com> Authored: Mon Oct 21 15:34:09 2013 -0400 Committer: Eric Newton <eric.new...@gmail.com> Committed: Mon Oct 21 15:34:09 2013 -0400 ---------------------------------------------------------------------- .../accumulo/server/tabletserver/Compactor.java | 12 +-- .../accumulo/server/tabletserver/Tablet.java | 99 +++++++++++++++++--- .../TabletServerResourceManager.java | 2 +- .../tabletserver/compaction/CompactionPass.java | 34 ------- .../tabletserver/compaction/CompactionPlan.java | 15 ++- .../compaction/CompactionStrategy.java | 26 +---- .../compaction/DefaultCompactionStrategy.java | 91 ++---------------- .../tabletserver/compaction/DefaultWriter.java | 33 ------- .../DefaultCompactionStrategyTest.java | 68 +------------- .../org/apache/accumulo/test/ShellServerIT.java | 4 +- .../test/TestConfigurableMajorCompactionIT.java | 15 +-- 11 files changed, 117 insertions(+), 282 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/c16b6ae4/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java index 760b6e0..5751f1e 100644 --- a/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java +++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java @@ -63,9 +63,6 @@ import org.apache.accumulo.server.problems.ProblemReportingIterator; import org.apache.accumulo.server.problems.ProblemReports; import org.apache.accumulo.server.problems.ProblemType; import org.apache.accumulo.server.tabletserver.Tablet.MinorCompactionReason; -import org.apache.accumulo.server.tabletserver.compaction.CompactionStrategy; -import org.apache.accumulo.server.tabletserver.compaction.CompactionStrategy.Writer; -import org.apache.accumulo.server.tabletserver.compaction.DefaultWriter; import org.apache.accumulo.server.tabletserver.compaction.MajorCompactionReason; import org.apache.accumulo.trace.instrument.Span; import org.apache.accumulo.trace.instrument.Trace; @@ -165,7 +162,6 @@ public class Compactor implements Callable<CompactionStats> { private long compactorID = nextCompactorID.getAndIncrement(); protected volatile Thread thread; - private Writer writer; private synchronized void setLocalityGroup(String name) { this.currentLocalityGroup = name; @@ -289,7 +285,7 @@ public class Compactor implements Callable<CompactionStats> { } Compactor(Configuration conf, VolumeManager fs, Map<FileRef,DataFileValue> files, InMemoryMap imm, FileRef outputFile, boolean propogateDeletes, - TableConfiguration acuTableConf, KeyExtent extent, CompactionEnv env, List<IteratorSetting> iterators, MajorCompactionReason reason, CompactionStrategy.Writer writer) { + TableConfiguration acuTableConf, KeyExtent extent, CompactionEnv env, List<IteratorSetting> iterators, MajorCompactionReason reason) { this.extent = extent; this.conf = conf; this.fs = fs; @@ -301,14 +297,13 @@ public class Compactor implements Callable<CompactionStats> { this.env = env; this.iterators = iterators; this.reason = reason; - this.writer = writer; startTime = System.currentTimeMillis(); } Compactor(Configuration conf, VolumeManager fs, Map<FileRef,DataFileValue> files, InMemoryMap imm, FileRef outputFile, boolean propogateDeletes, TableConfiguration acuTableConf, KeyExtent extent, CompactionEnv env) { - this(conf, fs, files, imm, outputFile, propogateDeletes, acuTableConf, extent, env, new ArrayList<IteratorSetting>(), null, new DefaultWriter()); + this(conf, fs, files, imm, outputFile, propogateDeletes, acuTableConf, extent, env, new ArrayList<IteratorSetting>(), null); } public VolumeManager getFileSystem() { @@ -466,7 +461,6 @@ public class Compactor implements Callable<CompactionStats> { private void compactLocalityGroup(String lgName, Set<ByteSequence> columnFamilies, boolean inclusive, FileSKVWriter mfw, CompactionStats majCStats) throws IOException, CompactionCanceledException { ArrayList<FileSKVIterator> readers = new ArrayList<FileSKVIterator>(filesToCompact.size()); - List<FileSKVWriter> writers = Collections.singletonList(mfw); Span span = Trace.start("compact"); try { long entriesCompacted = 0; @@ -505,7 +499,7 @@ public class Compactor implements Callable<CompactionStats> { Span write = Trace.start("write"); try { while (itr.hasTop() && env.isCompactionEnabled()) { - writer.write(itr.getTopKey(), itr.getTopValue(), writers); + mfw.append(itr.getTopKey(), itr.getTopValue()); itr.next(); entriesCompacted++; http://git-wip-us.apache.org/repos/asf/accumulo/blob/c16b6ae4/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java index ca561eb..bf835a2 100644 --- a/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java +++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java @@ -3031,6 +3031,56 @@ public class Tablet { return common; } + + private Map<FileRef,Pair<Key,Key>> getFirstAndLastKeys(MajorCompactionRequest request) throws IOException { + Map<FileRef,Pair<Key,Key>> result = new HashMap<FileRef,Pair<Key,Key>>(); + FileOperations fileFactory = FileOperations.getInstance(); + for (Entry<FileRef,DataFileValue> entry : request.getFiles().entrySet()) { + FileRef file = entry.getKey(); + FileSystem ns = fs.getFileSystemByPath(file.path()); + FileSKVIterator openReader = fileFactory.openReader(file.path().toString(), true, ns, ns.getConf(), this.getTableConfiguration()); + try { + Key first = openReader.getFirstKey(); + Key last = openReader.getLastKey(); + result.put(file, new Pair<Key,Key>(first, last)); + } finally { + openReader.close(); + } + } + return result; + } + + + List<FileRef> findChopFiles(KeyExtent extent, Map<FileRef,Pair<Key,Key>> firstAndLastKeys, Collection<FileRef> allFiles) throws IOException { + List<FileRef> result = new ArrayList<FileRef>(); + if (firstAndLastKeys == null) { + result.addAll(allFiles); + return result; + } + + for (FileRef file : allFiles) { + Pair<Key,Key> pair = firstAndLastKeys.get(file); + if (pair == null) { + // file was created or imported after we obtained the first and last keys... there + // are a few options here... throw an exception which will cause the compaction to + // retry and also cause ugly error message that the admin has to ignore... could + // go get the first and last key, but this code is called while the tablet lock + // is held... or just compact the file.... + result.add(file); + } else { + Key first = pair.getFirst(); + Key last = pair.getSecond(); + // If first and last are null, it's an empty file. Add it to the compact set so it goes away. + if ((first == null && last == null) || !extent.contains(first.getRow()) || !extent.contains(last.getRow())) { + result.add(file); + } + } + } + return result; + } + + + /** * Returns true if this tablet needs to be split * @@ -3061,13 +3111,20 @@ public class Tablet { MajorCompactionRequest request = new MajorCompactionRequest(extent, reason, fs, acuTableConf); request.setFiles(datafileManager.getDatafileSizes()); strategy.gatherInformation(request); - + Map<FileRef,Pair<Key,Key>> firstAndLastKeys = null; + + if (reason == MajorCompactionReason.CHOP) { + firstAndLastKeys = getFirstAndLastKeys(request); + } + Map<FileRef, DataFileValue> filesToCompact; int maxFilesToCompact = acuTableConf.getCount(Property.TSERV_MAJC_THREAD_MAXOPEN); CompactionStats majCStats = new CompactionStats(); - CompactionPlan plan; + CompactionPlan plan = null; + + boolean propogateDeletes = false; synchronized (this) { // plan all that work that needs to be done in the sync block... then do the actual work @@ -3092,29 +3149,43 @@ public class Tablet { cleanUpFiles(fs, fs.listStatus(this.location), false); } request.setFiles(datafileManager.getDatafileSizes()); - plan = strategy.getCompactionPlan(request); - if (plan == null || plan.passes.isEmpty()) { + List<FileRef> inputFiles = new ArrayList<FileRef>(); + if (request.getReason() == MajorCompactionReason.CHOP) { + // enforce rules: files with keys outside our range need to be compacted + inputFiles.addAll(findChopFiles(extent, firstAndLastKeys, request.getFiles().keySet())); + } else if (request.getReason() == MajorCompactionReason.USER) { + inputFiles.addAll(request.getFiles().keySet()); + } else { + plan = strategy.getCompactionPlan(request); + if (plan != null) + inputFiles.addAll(plan.inputFiles); + } + + if (inputFiles.isEmpty()) { return majCStats; } - log.debug("Major compaction plan: " + plan); - if (plan.passes.size() > 1) - log.info("Multiple passes presently not supported, only performing the first pass"); - if (plan.passes.get(0).outputFiles != 1) - log.warn("Only one output file is supported, but " + plan.passes.get(0).outputFiles + " requested"); + // If no original files will exist at the end of the compaction, we do not have to propogate deletes + Set<FileRef> droppedFiles = new HashSet<FileRef>(); + droppedFiles.addAll(inputFiles); + if (plan != null) + droppedFiles.addAll(plan.deleteFiles); + propogateDeletes = !(droppedFiles.equals(request.getFiles().keySet())); + log.debug("Major compaction plan: " + plan + " propogate deletes : " + propogateDeletes); filesToCompact = new HashMap<FileRef, DataFileValue>(request.getFiles()); - filesToCompact.keySet().retainAll(plan.passes.get(0).inputFiles); + filesToCompact.keySet().retainAll(inputFiles); t3 = System.currentTimeMillis(); datafileManager.reserveMajorCompactingFiles(filesToCompact.keySet()); } + try { log.debug(String.format("MajC initiate lock %.2f secs, wait %.2f secs", (t3 - t2) / 1000.0, (t2 - t1) / 1000.0)); Pair<Long,List<IteratorSetting>> compactionId = null; - if (!plan.propogateDeletes) { + if (!propogateDeletes) { // compacting everything, so update the compaction id in !METADATA try { compactionId = getCompactionID(); @@ -3153,7 +3224,7 @@ public class Tablet { Set<FileRef> smallestFiles = removeSmallest(filesToCompact, numToCompact); - FileRef fileName = getNextMapFilename((filesToCompact.size() == 0 && !plan.propogateDeletes) ? "A" : "C"); + FileRef fileName = getNextMapFilename((filesToCompact.size() == 0 && !propogateDeletes) ? "A" : "C"); FileRef compactTmpName = new FileRef(fileName.path().toString() + "_tmp"); Span span = Trace.start("compactFiles"); @@ -3181,8 +3252,8 @@ public class Tablet { // always propagate deletes, unless last batch boolean lastBatch = filesToCompact.isEmpty(); - Compactor compactor = new Compactor(conf, fs, copy, null, compactTmpName, lastBatch ? plan.propogateDeletes : true, acuTableConf, extent, - cenv, compactionIterators, reason, strategy.getCompactionWriter()); + Compactor compactor = new Compactor(conf, fs, copy, null, compactTmpName, lastBatch ? propogateDeletes : true, acuTableConf, extent, + cenv, compactionIterators, reason); CompactionStats mcs = compactor.call(); http://git-wip-us.apache.org/repos/asf/accumulo/blob/c16b6ae4/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java index 403555a..381280d 100644 --- a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java +++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java @@ -568,7 +568,7 @@ public class TabletServerResourceManager { request.setFiles(tabletFiles); try { CompactionPlan plan = strategy.getCompactionPlan(request); - if (plan == null || plan.passes.isEmpty()) + if (plan == null || plan.inputFiles.isEmpty()) return false; return true; } catch (IOException ex) { http://git-wip-us.apache.org/repos/asf/accumulo/blob/c16b6ae4/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/CompactionPass.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/CompactionPass.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/CompactionPass.java deleted file mode 100644 index 97ad9d7..0000000 --- a/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/CompactionPass.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.server.tabletserver.compaction; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.accumulo.server.fs.FileRef; - -/** - * Information about a single compaction pass: input files and the number of output files to write. - * Presently, the number of output files must always be 1. - */ -public class CompactionPass { - public List<FileRef> inputFiles = new ArrayList<FileRef>(); - public int outputFiles = 1; - public String toString() { - return inputFiles.toString() + " -> " + outputFiles + " files"; - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/c16b6ae4/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/CompactionPlan.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/CompactionPlan.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/CompactionPlan.java index c92788e..33f080b 100644 --- a/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/CompactionPlan.java +++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/CompactionPlan.java @@ -22,23 +22,20 @@ import java.util.List; import org.apache.accumulo.server.fs.FileRef; /** - * A plan for a compaction: the passes to run over input files, producing output files to replace them, - * the files that are *not* inputs to a compaction that should simply be deleted, and weather or not to - * propagate deletes from input files to output files. + * A plan for a compaction: the input files the files that are *not* inputs to a compaction that should + * simply be deleted. */ public class CompactionPlan { - public List<CompactionPass> passes = new ArrayList<CompactionPass>(); - public List<FileRef> deleteFiles = new ArrayList<FileRef>(); - public boolean propogateDeletes = true; + public final List<FileRef> inputFiles = new ArrayList<FileRef>(); + public final List<FileRef> deleteFiles = new ArrayList<FileRef>(); + public String toString() { StringBuilder b = new StringBuilder(); - b.append(passes.toString()); + b.append(inputFiles.toString()); if (!deleteFiles.isEmpty()) { b.append(" files to be deleted "); b.append(deleteFiles); } - b.append(" propogateDeletes "); - b.append(propogateDeletes); return b.toString(); } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/c16b6ae4/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/CompactionStrategy.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/CompactionStrategy.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/CompactionStrategy.java index 0d928b6..16e4db1 100644 --- a/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/CompactionStrategy.java +++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/CompactionStrategy.java @@ -17,30 +17,20 @@ package org.apache.accumulo.server.tabletserver.compaction; import java.io.IOException; -import java.util.List; - -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.file.FileSKVWriter; /** * The interface for customizing major compactions. */ -public interface CompactionStrategy { +public abstract class CompactionStrategy { /** - * Called for each output key/value to determine which file should get which key/value pair. - */ - public interface Writer { - void write(Key key, Value value, List<FileSKVWriter> outputFiles) throws IOException; - } - - /** * Called prior to obtaining the tablet lock, useful for examining metadata or indexes. * @param request basic details about the tablet * @throws IOException */ - void gatherInformation(MajorCompactionRequest request) throws IOException; + public void gatherInformation(MajorCompactionRequest request) throws IOException { + + } /** * Get the plan for compacting a tablets files. Called while holding the tablet lock, so it should not be doing any blocking. @@ -48,12 +38,6 @@ public interface CompactionStrategy { * @return the plan for a major compaction * @throws IOException */ - CompactionPlan getCompactionPlan(MajorCompactionRequest request) throws IOException; - - /** - * Get the callback for this compaction to determine where to write the output. - * @return - */ - Writer getCompactionWriter(); + abstract public CompactionPlan getCompactionPlan(MajorCompactionRequest request) throws IOException; } http://git-wip-us.apache.org/repos/asf/accumulo/blob/c16b6ae4/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/DefaultCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/DefaultCompactionStrategy.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/DefaultCompactionStrategy.java index 3f88188..5aa0f98 100644 --- a/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/DefaultCompactionStrategy.java +++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/DefaultCompactionStrategy.java @@ -19,98 +19,26 @@ package org.apache.accumulo.server.tabletserver.compaction; import java.io.IOException; import java.util.ArrayList; import java.util.Comparator; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Map.Entry; import java.util.TreeMap; import java.util.TreeSet; import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.KeyExtent; -import org.apache.accumulo.core.file.FileSKVIterator; import org.apache.accumulo.core.metadata.schema.DataFileValue; -import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.server.fs.FileRef; -public class DefaultCompactionStrategy implements CompactionStrategy { +public class DefaultCompactionStrategy extends CompactionStrategy { - Map<FileRef,Pair<Key,Key>> firstAndLastKeys = null; - - @Override - public void gatherInformation(MajorCompactionRequest request) throws IOException { - if (request.getReason() == MajorCompactionReason.CHOP) { - firstAndLastKeys = getFirstAndLastKeys(request); - } - } @Override public CompactionPlan getCompactionPlan(MajorCompactionRequest request) throws IOException { CompactionPlan result = new CompactionPlan(); - List<FileRef> toCompact; - MajorCompactionReason reason = request.getReason(); - if (reason == MajorCompactionReason.CHOP) { - toCompact = findChopFiles(request); - } else { - toCompact = findMapFilesToCompact(request); - } - CompactionPass pass = new CompactionPass(); - pass.inputFiles = toCompact; + List<FileRef> toCompact = findMapFilesToCompact(request); if (toCompact == null || toCompact.isEmpty()) return result; - result.passes.add(pass); - result.propogateDeletes = toCompact.size() != request.getFiles().size(); - return result; - } - - private Map<FileRef,Pair<Key,Key>> getFirstAndLastKeys(MajorCompactionRequest request) throws IOException { - Map<FileRef,Pair<Key,Key>> result = new HashMap<FileRef,Pair<Key,Key>>(); - - for (Entry<FileRef,DataFileValue> entry : request.getFiles().entrySet()) { - FileRef file = entry.getKey(); - FileSKVIterator openReader = request.openReader(file); - try { - Key first = openReader.getFirstKey(); - Key last = openReader.getLastKey(); - result.put(file, new Pair<Key,Key>(first, last)); - } finally { - openReader.close(); - } - } - return result; - } - - - List<FileRef> findChopFiles(MajorCompactionRequest request) throws IOException { - List<FileRef> result = new ArrayList<FileRef>(); - if (firstAndLastKeys == null) { - // someone called getCompactionPlan without calling gatherInformation: compact everything - result.addAll(request.getFiles().keySet()); - return result; - } - - for (Entry<FileRef,DataFileValue> entry : request.getFiles().entrySet()) { - FileRef file = entry.getKey(); - Pair<Key,Key> pair = firstAndLastKeys.get(file); - if (pair == null) { - // file was created or imported after we obtained the first and last keys... there - // are a few options here... throw an exception which will cause the compaction to - // retry and also cause ugly error message that the admin has to ignore... could - // go get the first and last key, but this code is called while the tablet lock - // is held... or just compact the file.... - result.add(file); - } else { - Key first = pair.getFirst(); - Key last = pair.getSecond(); - // If first and last are null, it's an empty file. Add it to the compact set so it goes away. - KeyExtent extent = request.getExtent(); - if ((first == null && last == null) || !extent.contains(first.getRow()) || !extent.contains(last.getRow())) { - result.add(file); - } - } - } + result.inputFiles.addAll(toCompact); return result; } @@ -123,13 +51,17 @@ public class DefaultCompactionStrategy implements CompactionStrategy { this.size = size; } } - + private List<FileRef> findMapFilesToCompact(MajorCompactionRequest request) { MajorCompactionReason reason = request.getReason(); if (reason == MajorCompactionReason.USER) { return new ArrayList<FileRef>(request.getFiles().keySet()); } + if (reason == MajorCompactionReason.CHOP) { + // should not happen, but this is safe + return new ArrayList<FileRef>(request.getFiles().keySet()); + } if (request.getFiles().size() <= 1) return null; @@ -187,7 +119,7 @@ public class DefaultCompactionStrategy implements CompactionStrategy { TreeMap<FileRef,Long> tfc = new TreeMap<FileRef,Long>(); for (Entry<FileRef,DataFileValue> entry : request.getFiles().entrySet()) { - tfc.put(entry.getKey(), entry.getValue().getSize()); + tfc.put(entry.getKey(), entry.getValue().getSize()); } tfc.keySet().removeAll(files); @@ -205,9 +137,4 @@ public class DefaultCompactionStrategy implements CompactionStrategy { return files; } - @Override - public Writer getCompactionWriter() { - return new DefaultWriter(); - } - } http://git-wip-us.apache.org/repos/asf/accumulo/blob/c16b6ae4/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/DefaultWriter.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/DefaultWriter.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/DefaultWriter.java deleted file mode 100644 index 68cd460..0000000 --- a/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/DefaultWriter.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.server.tabletserver.compaction; - -import java.io.IOException; -import java.util.List; - -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.file.FileSKVWriter; - -public class DefaultWriter implements CompactionStrategy.Writer { - - @Override - public void write(Key key, Value value, List<FileSKVWriter> outputFiles) throws IOException { - outputFiles.get(0).append(key, value); - } - -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/c16b6ae4/server/src/test/java/org/apache/accumulo/server/tabletserver/compaction/DefaultCompactionStrategyTest.java ---------------------------------------------------------------------- diff --git a/server/src/test/java/org/apache/accumulo/server/tabletserver/compaction/DefaultCompactionStrategyTest.java b/server/src/test/java/org/apache/accumulo/server/tabletserver/compaction/DefaultCompactionStrategyTest.java index 99d5be1..a11bd66 100644 --- a/server/src/test/java/org/apache/accumulo/server/tabletserver/compaction/DefaultCompactionStrategyTest.java +++ b/server/src/test/java/org/apache/accumulo/server/tabletserver/compaction/DefaultCompactionStrategyTest.java @@ -17,7 +17,6 @@ package org.apache.accumulo.server.tabletserver.compaction; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.io.DataInputStream; @@ -197,83 +196,26 @@ public class DefaultCompactionStrategyTest { MajorCompactionRequest request = createRequest(MajorCompactionReason.IDLE, "file1", 10, "file2", 10); s.gatherInformation(request); CompactionPlan plan = s.getCompactionPlan(request); - assertTrue(plan.passes.isEmpty()); + assertTrue(plan.inputFiles.isEmpty()); // do everything request = createRequest(MajorCompactionReason.IDLE, "file1", 10, "file2", 10, "file3", 10); s.gatherInformation(request); plan = s.getCompactionPlan(request); - assertEquals(1, plan.passes.size()); - CompactionPass pass = plan.passes.get(0); - assertEquals(3, pass.inputFiles.size()); - assertEquals(1, pass.outputFiles); - assertFalse(plan.propogateDeletes); + assertEquals(3, plan.inputFiles.size()); // do everything request = createRequest(MajorCompactionReason.USER, "file1", 10, "file2", 10); s.gatherInformation(request); plan = s.getCompactionPlan(request); - assertEquals(1, plan.passes.size()); - pass = plan.passes.get(0); - assertEquals(2, pass.inputFiles.size()); - assertEquals(1, pass.outputFiles); - assertFalse(plan.propogateDeletes); + assertEquals(2, plan.inputFiles.size()); // partial request = createRequest(MajorCompactionReason.NORMAL, "file0", 100, "file1", 10, "file2", 10, "file3", 10); s.gatherInformation(request); plan = s.getCompactionPlan(request); - assertEquals(1, plan.passes.size()); - pass = plan.passes.get(0); - assertEquals(3, pass.inputFiles.size()); - assertEquals(1, pass.outputFiles); - assertEquals(asStringSet(pass.inputFiles), asSet("file1,file2,file3".split(","))); - assertTrue(plan.propogateDeletes); + assertEquals(3, plan.inputFiles.size()); + assertEquals(asStringSet(plan.inputFiles), asSet("file1,file2,file3".split(","))); - // chop tests - // everything overlaps default tablet - request = createRequest(MajorCompactionReason.NORMAL, "file1", 10, "file2", 10, "file3", 10); - s.gatherInformation(request); - plan = s.getCompactionPlan(request); - assertEquals(1, plan.passes.size()); - pass = plan.passes.get(0); - assertEquals(3, pass.inputFiles.size()); - assertEquals(1, pass.outputFiles); - assertEquals(asStringSet(pass.inputFiles), asSet("file1,file2,file3".split(","))); - assertFalse(plan.propogateDeletes); - - // Partial overlap - KeyExtent extent = new KeyExtent(new Text("0"), new Text("n"), new Text("a")); - request = createRequest(extent, MajorCompactionReason.CHOP, "file1", 10, "file2", 10, "file3", 10); - s.gatherInformation(request); - plan = s.getCompactionPlan(request); - assertEquals(1, plan.passes.size()); - pass = plan.passes.get(0); - assertEquals(2, pass.inputFiles.size()); - assertEquals(1, pass.outputFiles); - assertEquals(asStringSet(pass.inputFiles), asSet("file2,file3".split(","))); - assertTrue(plan.propogateDeletes); - - // empty file - request = createRequest(extent, MajorCompactionReason.CHOP, "file1", 10, "file4", 10); - s.gatherInformation(request); - plan = s.getCompactionPlan(request); - assertEquals(1, plan.passes.size()); - pass = plan.passes.get(0); - assertEquals(1, pass.inputFiles.size()); - assertEquals(1, pass.outputFiles); - assertEquals(asStringSet(pass.inputFiles), asSet("file4".split(","))); - assertTrue(plan.propogateDeletes); - - // file without first/last keys - request = createRequest(extent, MajorCompactionReason.CHOP, "file1", 10, "file5", 10); - s.gatherInformation(request); - plan = s.getCompactionPlan(request); - assertEquals(1, plan.passes.size()); - pass = plan.passes.get(0); - assertEquals(1, pass.inputFiles.size()); - assertEquals(1, pass.outputFiles); - assertEquals(asStringSet(pass.inputFiles), asSet("file5".split(","))); - assertTrue(plan.propogateDeletes); } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/c16b6ae4/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java b/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java index 179bca7..e77561f 100644 --- a/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java +++ b/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java @@ -545,9 +545,9 @@ public class ShellServerIT extends SimpleMacIT { exec("addsplits row5 row7"); make10(); exec("flush -w -t t"); - assertTrue(base + 3 == countFiles()); + assertEquals(base + 3, countFiles()); exec("deleterows -t t -b row5 -e row7", true); - assertTrue(base + 2 == countFiles()); + assertEquals(base + 2, countFiles()); exec("deletetable -f t"); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/c16b6ae4/test/src/test/java/org/apache/accumulo/test/TestConfigurableMajorCompactionIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/TestConfigurableMajorCompactionIT.java b/test/src/test/java/org/apache/accumulo/test/TestConfigurableMajorCompactionIT.java index 1d33666..13f81c5 100644 --- a/test/src/test/java/org/apache/accumulo/test/TestConfigurableMajorCompactionIT.java +++ b/test/src/test/java/org/apache/accumulo/test/TestConfigurableMajorCompactionIT.java @@ -37,10 +37,8 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.fate.util.UtilWaitThread; import org.apache.accumulo.minicluster.MiniAccumuloConfig; -import org.apache.accumulo.server.tabletserver.compaction.CompactionPass; import org.apache.accumulo.server.tabletserver.compaction.CompactionPlan; import org.apache.accumulo.server.tabletserver.compaction.CompactionStrategy; -import org.apache.accumulo.server.tabletserver.compaction.DefaultWriter; import org.apache.accumulo.server.tabletserver.compaction.MajorCompactionRequest; import org.apache.accumulo.test.functional.ConfigurableMacIT; import org.junit.Test; @@ -62,22 +60,11 @@ public class TestConfigurableMajorCompactionIT extends ConfigurableMacIT { @Override public CompactionPlan getCompactionPlan(MajorCompactionRequest request) throws IOException { CompactionPlan plan = new CompactionPlan(); - if (request.getFiles().size() == 5) { - CompactionPass pass = new CompactionPass(); - pass.inputFiles.addAll(request.getFiles().keySet()); - plan.passes.add(pass); - } + plan.inputFiles.addAll(request.getFiles().keySet()); return plan; } - - @Override - public Writer getCompactionWriter() { - return new DefaultWriter(); - } } - - @Test(timeout = 20 * 1000) public void test() throws Exception { Connector conn = getConnector();