Repository: accumulo Updated Branches: refs/heads/master 433b6df06 -> 2f788f482
http://git-wip-us.apache.org/repos/asf/accumulo/blob/2f788f48/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactRange.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactRange.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactRange.java index b3037d3..db8bbfe 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactRange.java +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactRange.java @@ -18,10 +18,6 @@ package org.apache.accumulo.master.tableOps; import static java.nio.charset.StandardCharsets.UTF_8; -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -34,6 +30,7 @@ import org.apache.accumulo.core.client.IsolatedScanner; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.RowIterator; import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.admin.CompactionStrategyConfig; import org.apache.accumulo.core.client.impl.Tables; import org.apache.accumulo.core.client.impl.thrift.TableOperation; import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType; @@ -55,10 +52,10 @@ import org.apache.accumulo.fate.zookeeper.IZooReaderWriter.Mutator; import org.apache.accumulo.master.Master; import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection; import org.apache.accumulo.server.master.state.TServerInstance; +import org.apache.accumulo.server.master.tableOps.UserCompactionConfig; import org.apache.accumulo.server.zookeeper.ZooReaderWriter; import org.apache.commons.codec.binary.Hex; import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; import org.apache.log4j.Logger; import org.apache.thrift.TException; @@ -207,95 +204,17 @@ public class CompactRange extends MasterRepo { private final String tableId; private byte[] startRow; private byte[] endRow; - private byte[] iterators; + private byte[] config; - public static class CompactionIterators implements Writable { - byte[] startRow; - byte[] endRow; - List<IteratorSetting> iterators; - public CompactionIterators(byte[] startRow, byte[] endRow, List<IteratorSetting> iterators) { - this.startRow = startRow; - this.endRow = endRow; - this.iterators = iterators; - } - - public CompactionIterators() { - startRow = null; - endRow = null; - iterators = Collections.emptyList(); - } - - @Override - public void write(DataOutput out) throws IOException { - out.writeBoolean(startRow != null); - if (startRow != null) { - out.writeInt(startRow.length); - out.write(startRow); - } - - out.writeBoolean(endRow != null); - if (endRow != null) { - out.writeInt(endRow.length); - out.write(endRow); - } - - out.writeInt(iterators.size()); - for (IteratorSetting is : iterators) { - is.write(out); - } - } - - @Override - public void readFields(DataInput in) throws IOException { - if (in.readBoolean()) { - startRow = new byte[in.readInt()]; - in.readFully(startRow); - } else { - startRow = null; - } - - if (in.readBoolean()) { - endRow = new byte[in.readInt()]; - in.readFully(endRow); - } else { - endRow = null; - } - - int num = in.readInt(); - iterators = new ArrayList<IteratorSetting>(num); - - for (int i = 0; i < num; i++) { - iterators.add(new IteratorSetting(in)); - } - } - - public Text getEndRow() { - if (endRow == null) - return null; - return new Text(endRow); - } - - public Text getStartRow() { - if (startRow == null) - return null; - return new Text(startRow); - } - - public List<IteratorSetting> getIterators() { - return iterators; - } - } - - public CompactRange(String tableId, byte[] startRow, byte[] endRow, List<IteratorSetting> iterators) throws ThriftTableOperationException { + public CompactRange(String tableId, byte[] startRow, byte[] endRow, List<IteratorSetting> iterators, CompactionStrategyConfig compactionStrategy) + throws ThriftTableOperationException { this.tableId = tableId; this.startRow = startRow.length == 0 ? null : startRow; this.endRow = endRow.length == 0 ? null : endRow; - if (iterators.size() > 0) { - this.iterators = WritableUtils.toByteArray(new CompactionIterators(this.startRow, this.endRow, iterators)); - } else { - iterators = null; + if (iterators.size() > 0 || compactionStrategy != null) { + this.config = WritableUtils.toByteArray(new UserCompactionConfig(this.startRow, this.endRow, iterators, compactionStrategy)); } if (this.startRow != null && this.endRow != null && new Text(startRow).compareTo(new Text(endRow)) >= 0) @@ -337,12 +256,12 @@ public class CompactRange extends MasterRepo { StringBuilder encodedIterators = new StringBuilder(); - if (iterators != null) { + if (config != null) { Hex hex = new Hex(); encodedIterators.append(","); encodedIterators.append(txidString); encodedIterators.append("="); - encodedIterators.append(new String(hex.encode(iterators), UTF_8)); + encodedIterators.append(new String(hex.encode(config), UTF_8)); } return (Long.toString(flushID) + encodedIterators).getBytes(UTF_8); http://git-wip-us.apache.org/repos/asf/accumulo/blob/2f788f48/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index d4447ab..f9f5b4c 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -166,6 +166,7 @@ import org.apache.accumulo.server.master.state.TabletLocationState; import org.apache.accumulo.server.master.state.TabletLocationState.BadLocationStateException; import org.apache.accumulo.server.master.state.TabletStateStore; import org.apache.accumulo.server.master.state.ZooTabletStateStore; +import org.apache.accumulo.server.master.tableOps.UserCompactionConfig; import org.apache.accumulo.server.problems.ProblemReport; import org.apache.accumulo.server.problems.ProblemReports; import org.apache.accumulo.server.replication.ZooKeeperInitialization; @@ -1643,19 +1644,19 @@ public class TabletServer extends AccumuloServerContext implements Runnable { tabletsToCompact.add(tablet); } - Long compactionId = null; + Pair<Long,UserCompactionConfig> compactionInfo = null; for (Tablet tablet : tabletsToCompact) { // all for the same table id, so only need to read // compaction id once - if (compactionId == null) + if (compactionInfo == null) try { - compactionId = tablet.getCompactionID().getFirst(); + compactionInfo = tablet.getCompactionID(); } catch (NoNodeException e) { log.info("Asked to compact table with no compaction id " + ke + " " + e.getMessage()); return; } - tablet.compactAll(compactionId); + tablet.compactAll(compactionInfo.getFirst(), compactionInfo.getSecond()); } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/2f788f48/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java index 6c31fab..6b2eaf0 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java @@ -16,8 +16,6 @@ */ package org.apache.accumulo.tserver; -import static com.google.common.base.Preconditions.checkNotNull; - import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -62,6 +60,8 @@ import org.apache.accumulo.tserver.compaction.MajorCompactionRequest; import org.apache.accumulo.tserver.tablet.Tablet; import org.apache.log4j.Logger; +import static com.google.common.base.Preconditions.checkNotNull; + /** * ResourceManager is responsible for managing the resources of all tablets within a tablet server. * @@ -638,11 +638,13 @@ public class TabletServerResourceManager { request.setFiles(tabletFiles); try { return strategy.shouldCompact(request); - } catch (IOException ex) { - return false; + } catch (IOException e) { + throw new RuntimeException(e); } } + + // END methods that Tablets call to make decisions about major compaction // tablets call this method to run minor compactions, http://git-wip-us.apache.org/repos/asf/accumulo/blob/2f788f48/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/CompactionPlan.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/CompactionPlan.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/CompactionPlan.java index 6f69fb0..75c6bd8 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/CompactionPlan.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/CompactionPlan.java @@ -22,9 +22,8 @@ import java.util.HashSet; import java.util.List; import java.util.Set; -import org.apache.accumulo.server.fs.FileRef; - import com.google.common.collect.Sets; +import org.apache.accumulo.server.fs.FileRef; /** * A plan for a compaction: the input files, the files that are *not* inputs to a compaction that should simply be deleted, and the optional parameters used to http://git-wip-us.apache.org/repos/asf/accumulo/blob/2f788f48/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/CompactionStrategy.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/CompactionStrategy.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/CompactionStrategy.java index 7bc1a80..2d94884 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/CompactionStrategy.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/CompactionStrategy.java @@ -43,6 +43,12 @@ public abstract class CompactionStrategy { * {@link #getCompactionPlan(MajorCompactionRequest)}) that it does not need to. Any state stored during shouldCompact will no longer exist when * {@link #gatherInformation(MajorCompactionRequest)} and {@link #getCompactionPlan(MajorCompactionRequest)} are called. * + * <P> + * Called while holding the tablet lock, so it should not be doing any blocking. + * + * <P> + * Since no blocking should be done in this method, then its unexpected that this method will throw IOException. However since its in the API, it can not be + * easily removed. */ public abstract boolean shouldCompact(MajorCompactionRequest request) throws IOException; @@ -58,6 +64,10 @@ public abstract class CompactionStrategy { /** * Get the plan for compacting a tablets files. Called while holding the tablet lock, so it should not be doing any blocking. * + * <P> + * Since no blocking should be done in this method, then its unexpected that this method will throw IOException. However since its in the API, it can not be + * easily removed. + * * @param request * basic details about the tablet * @return the plan for a major compaction, or null to cancel the compaction. http://git-wip-us.apache.org/repos/asf/accumulo/blob/2f788f48/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/DefaultCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/DefaultCompactionStrategy.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/DefaultCompactionStrategy.java index 8b03d17..1f0dc3a 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/DefaultCompactionStrategy.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/DefaultCompactionStrategy.java @@ -16,7 +16,6 @@ */ package org.apache.accumulo.tserver.compaction; -import java.io.IOException; import java.util.ArrayList; import java.util.Comparator; import java.util.List; @@ -31,13 +30,13 @@ import org.apache.accumulo.server.fs.FileRef; public class DefaultCompactionStrategy extends CompactionStrategy { @Override - public boolean shouldCompact(MajorCompactionRequest request) throws IOException { + public boolean shouldCompact(MajorCompactionRequest request) { CompactionPlan plan = getCompactionPlan(request); return plan != null && !plan.inputFiles.isEmpty(); } @Override - public CompactionPlan getCompactionPlan(MajorCompactionRequest request) throws IOException { + public CompactionPlan getCompactionPlan(MajorCompactionRequest request) { CompactionPlan result = new CompactionPlan(); List<FileRef> toCompact = findMapFilesToCompact(request); http://git-wip-us.apache.org/repos/asf/accumulo/blob/2f788f48/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/EverythingCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/EverythingCompactionStrategy.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/EverythingCompactionStrategy.java new file mode 100644 index 0000000..9295c30 --- /dev/null +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/EverythingCompactionStrategy.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.accumulo.tserver.compaction; + +import java.io.IOException; + +/** + * The default compaction strategy for user initiated compactions. This strategy will always select all files. + */ + +public class EverythingCompactionStrategy extends CompactionStrategy { + + @Override + public boolean shouldCompact(MajorCompactionRequest request) throws IOException { + return request.getFiles().size() > 0; + } + + @Override + public CompactionPlan getCompactionPlan(MajorCompactionRequest request) throws IOException { + CompactionPlan plan = new CompactionPlan(); + plan.inputFiles.addAll(request.getFiles().keySet()); + return plan; + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/2f788f48/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/SizeLimitCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/SizeLimitCompactionStrategy.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/SizeLimitCompactionStrategy.java index 478939a..6d4dc79 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/SizeLimitCompactionStrategy.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/SizeLimitCompactionStrategy.java @@ -53,7 +53,7 @@ public class SizeLimitCompactionStrategy extends DefaultCompactionStrategy { } @Override - public boolean shouldCompact(MajorCompactionRequest request) throws IOException { + public boolean shouldCompact(MajorCompactionRequest request) { return super.shouldCompact(filterFiles(request)); } @@ -63,7 +63,7 @@ public class SizeLimitCompactionStrategy extends DefaultCompactionStrategy { } @Override - public CompactionPlan getCompactionPlan(MajorCompactionRequest request) throws IOException { + public CompactionPlan getCompactionPlan(MajorCompactionRequest request) { return super.getCompactionPlan(filterFiles(request)); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/2f788f48/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java index bc55c4f..bc75062 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java @@ -45,6 +45,7 @@ import java.util.concurrent.locks.ReentrantLock; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.Durability; import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.admin.CompactionStrategyConfig; import org.apache.accumulo.core.client.impl.DurabilityImpl; import org.apache.accumulo.core.client.impl.Tables; import org.apache.accumulo.core.conf.AccumuloConfiguration; @@ -96,10 +97,11 @@ import org.apache.accumulo.server.fs.VolumeManager.FileType; import org.apache.accumulo.server.fs.VolumeUtil; import org.apache.accumulo.server.fs.VolumeUtil.TabletFiles; import org.apache.accumulo.server.master.state.TServerInstance; -import org.apache.accumulo.server.master.tableOps.CompactionIterators; +import org.apache.accumulo.server.master.tableOps.UserCompactionConfig; import org.apache.accumulo.server.problems.ProblemReport; import org.apache.accumulo.server.problems.ProblemReports; import org.apache.accumulo.server.problems.ProblemType; +import org.apache.accumulo.server.security.SystemCredentials; import org.apache.accumulo.server.tablets.TabletTime; import org.apache.accumulo.server.tablets.UniqueNameAllocator; import org.apache.accumulo.server.util.FileUtil; @@ -1152,7 +1154,7 @@ public class Tablet implements TabletCommitter { } } - public Pair<Long,List<IteratorSetting>> getCompactionID() throws NoNodeException { + public Pair<Long,UserCompactionConfig> getCompactionID() throws NoNodeException { try { String zTablePath = Constants.ZROOT + "/" + tabletServer.getInstance().getInstanceID() + Constants.ZTABLES + "/" + extent.getTableId() + Constants.ZTABLE_COMPACT_ID; @@ -1160,7 +1162,7 @@ public class Tablet implements TabletCommitter { String[] tokens = new String(ZooReaderWriter.getInstance().getData(zTablePath, null), UTF_8).split(","); long compactID = Long.parseLong(tokens[0]); - CompactionIterators iters = new CompactionIterators(); + UserCompactionConfig compactionConfig = new UserCompactionConfig(); if (tokens.length > 1) { Hex hex = new Hex(); @@ -1168,20 +1170,20 @@ public class Tablet implements TabletCommitter { DataInputStream dis = new DataInputStream(bais); try { - iters.readFields(dis); + compactionConfig.readFields(dis); } catch (IOException e) { throw new RuntimeException(e); } - KeyExtent ke = new KeyExtent(extent.getTableId(), iters.getEndRow(), iters.getStartRow()); + KeyExtent ke = new KeyExtent(extent.getTableId(), compactionConfig.getEndRow(), compactionConfig.getStartRow()); if (!ke.overlaps(extent)) { // only use iterators if compaction range overlaps - iters = new CompactionIterators(); + compactionConfig = new UserCompactionConfig(); } } - return new Pair<Long,List<IteratorSetting>>(compactID, iters.getIterators()); + return new Pair<Long,UserCompactionConfig>(compactID, compactionConfig); } catch (InterruptedException e) { throw new RuntimeException(e); } catch (NumberFormatException nfe) { @@ -1780,21 +1782,34 @@ public class Tablet implements TabletCommitter { long t1, t2, t3; - // acquire file info outside of tablet lock - CompactionStrategy strategy = Property.createTableInstanceFromPropertyName(tableConfiguration, Property.TABLE_COMPACTION_STRATEGY, - CompactionStrategy.class, new DefaultCompactionStrategy()); - strategy.init(Property.getCompactionStrategyOptions(tableConfiguration)); - + Pair<Long,UserCompactionConfig> compactionId = null; + CompactionStrategy strategy = null; Map<FileRef,Pair<Key,Key>> firstAndLastKeys = null; - if (reason == MajorCompactionReason.CHOP) { + + if(reason == MajorCompactionReason.USER){ + try { + compactionId = getCompactionID(); + strategy = createCompactionStrategy(compactionId.getSecond().getCompactionStrategy()); + } catch (NoNodeException e) { + throw new RuntimeException(e); + } + } else if (reason == MajorCompactionReason.NORMAL || reason == MajorCompactionReason.IDLE) { + strategy = Property.createTableInstanceFromPropertyName(tableConfiguration, Property.TABLE_COMPACTION_STRATEGY, CompactionStrategy.class, + new DefaultCompactionStrategy()); + strategy.init(Property.getCompactionStrategyOptions(tableConfiguration)); + } else if (reason == MajorCompactionReason.CHOP) { firstAndLastKeys = getFirstAndLastKeys(getDatafileManager().getDatafileSizes()); - } else if (reason != MajorCompactionReason.USER) { + } else { + throw new IllegalArgumentException("Unknown compaction reason " + reason); + } + + if (strategy != null) { MajorCompactionRequest request = new MajorCompactionRequest(extent, reason, getTabletServer().getFileSystem(), tableConfiguration); request.setFiles(getDatafileManager().getDatafileSizes()); strategy.gatherInformation(request); } - Map<FileRef,DataFileValue> filesToCompact; + Map<FileRef,DataFileValue> filesToCompact = null; int maxFilesToCompact = tableConfiguration.getCount(Property.TSERV_MAJC_THREAD_MAXOPEN); @@ -1802,6 +1817,7 @@ public class Tablet implements TabletCommitter { CompactionPlan plan = null; boolean propogateDeletes = false; + boolean updateCompactionID = false; synchronized (this) { // plan all that work that needs to be done in the sync block... then do the actual work @@ -1831,8 +1847,6 @@ public class Tablet implements TabletCommitter { if (reason == MajorCompactionReason.CHOP) { // enforce rules: files with keys outside our range need to be compacted inputFiles.addAll(findChopFiles(extent, firstAndLastKeys, allFiles.keySet())); - } else if (reason == MajorCompactionReason.USER) { - inputFiles.addAll(allFiles.keySet()); } else { MajorCompactionRequest request = new MajorCompactionRequest(extent, reason, fs, tableConfiguration); request.setFiles(allFiles); @@ -1844,32 +1858,48 @@ public class Tablet implements TabletCommitter { } if (inputFiles.isEmpty()) { - return majCStats; + if (reason == MajorCompactionReason.USER) { + // no work to do + lastCompactID = compactionId.getFirst(); + updateCompactionID = true; + } else { + return majCStats; + } + } else { + // If no original files will exist at the end of the compaction, we do not have to propogate deletes + Set<FileRef> droppedFiles = new HashSet<FileRef>(); + droppedFiles.addAll(inputFiles); + if (plan != null) + droppedFiles.addAll(plan.deleteFiles); + propogateDeletes = !(droppedFiles.equals(allFiles.keySet())); + log.debug("Major compaction plan: " + plan + " propogate deletes : " + propogateDeletes); + filesToCompact = new HashMap<FileRef,DataFileValue>(allFiles); + filesToCompact.keySet().retainAll(inputFiles); + + getDatafileManager().reserveMajorCompactingFiles(filesToCompact.keySet()); } - // If no original files will exist at the end of the compaction, we do not have to propogate deletes - Set<FileRef> droppedFiles = new HashSet<FileRef>(); - droppedFiles.addAll(inputFiles); - if (plan != null) - droppedFiles.addAll(plan.deleteFiles); - propogateDeletes = !(droppedFiles.equals(allFiles.keySet())); - log.debug("Major compaction plan: " + plan + " propogate deletes : " + propogateDeletes); - filesToCompact = new HashMap<FileRef,DataFileValue>(allFiles); - filesToCompact.keySet().retainAll(inputFiles); t3 = System.currentTimeMillis(); - - getDatafileManager().reserveMajorCompactingFiles(filesToCompact.keySet()); } try { log.debug(String.format("MajC initiate lock %.2f secs, wait %.2f secs", (t3 - t2) / 1000.0, (t2 - t1) / 1000.0)); - Pair<Long,List<IteratorSetting>> compactionId = null; - if (!propogateDeletes) { + if (updateCompactionID) { + MetadataTableUtil.updateTabletCompactID(extent, compactionId.getFirst(),tabletServer, getTabletServer().getLock()); + return majCStats; + } + + if (!propogateDeletes && compactionId == null) { // compacting everything, so update the compaction id in metadata try { compactionId = getCompactionID(); + if (compactionId.getSecond().getCompactionStrategy() != null) { + compactionId = null; + // TODO maybe return unless chop? + } + } catch (NoNodeException e) { throw new RuntimeException(e); } @@ -1890,7 +1920,7 @@ public class Tablet implements TabletCommitter { } } - compactionIterators = compactionId.getSecond(); + compactionIterators = compactionId.getSecond().getIterators(); } // need to handle case where only one file is being major compacted @@ -2495,7 +2525,24 @@ public class Tablet implements TabletCommitter { initiateMajorCompaction(MajorCompactionReason.CHOP); } - public void compactAll(long compactionId) { + private CompactionStrategy createCompactionStrategy(CompactionStrategyConfig strategyConfig) { + String context = tableConfiguration.get(Property.TABLE_CLASSPATH); + String clazzName = strategyConfig.getClassName(); + try { + Class<? extends CompactionStrategy> clazz; + if (context != null && !context.equals("")) + clazz = AccumuloVFSClassLoader.getContextManager().loadClass(context, clazzName, CompactionStrategy.class); + else + clazz = AccumuloVFSClassLoader.loadClass(clazzName, CompactionStrategy.class); + CompactionStrategy strategy = clazz.newInstance(); + strategy.init(strategyConfig.getOptions()); + return strategy; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public void compactAll(long compactionId, UserCompactionConfig compactionConfig) { boolean updateMetadata = false; synchronized (this) { @@ -2522,8 +2569,25 @@ public class Tablet implements TabletCommitter { majorCompactionState = CompactionState.IN_PROGRESS; updateMetadata = true; lastCompactID = compactionId; - } else - initiateMajorCompaction(MajorCompactionReason.USER); + } else { + CompactionStrategyConfig strategyConfig = compactionConfig.getCompactionStrategy(); + CompactionStrategy strategy = createCompactionStrategy(strategyConfig); + + MajorCompactionRequest request = new MajorCompactionRequest(extent, MajorCompactionReason.USER, getTabletServer().getFileSystem(), tableConfiguration); + request.setFiles(getDatafileManager().getDatafileSizes()); + + try { + if (strategy.shouldCompact(request)) { + initiateMajorCompaction(MajorCompactionReason.USER); + } else { + majorCompactionState = CompactionState.IN_PROGRESS; + updateMetadata = true; + lastCompactID = compactionId; + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } } if (updateMetadata) { http://git-wip-us.apache.org/repos/asf/accumulo/blob/2f788f48/shell/src/main/java/org/apache/accumulo/shell/commands/CompactCommand.java ---------------------------------------------------------------------- diff --git a/shell/src/main/java/org/apache/accumulo/shell/commands/CompactCommand.java b/shell/src/main/java/org/apache/accumulo/shell/commands/CompactCommand.java index 80dd9ba..660630e 100644 --- a/shell/src/main/java/org/apache/accumulo/shell/commands/CompactCommand.java +++ b/shell/src/main/java/org/apache/accumulo/shell/commands/CompactCommand.java @@ -17,28 +17,27 @@ package org.apache.accumulo.shell.commands; import java.util.ArrayList; -import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.admin.CompactionConfig; +import org.apache.accumulo.core.client.admin.CompactionStrategyConfig; import org.apache.accumulo.shell.Shell; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; -import org.apache.hadoop.io.Text; public class CompactCommand extends TableOperation { - private Option noFlushOption, waitOpt, profileOpt, cancelOpt; - private boolean flush; - private Text startRow; - private Text endRow; - private List<IteratorSetting> iterators; + private Option noFlushOption, waitOpt, profileOpt, cancelOpt, strategyOpt, strategyConfigOpt; + + private CompactionConfig compactionConfig = null; boolean override = false; - private boolean wait; private boolean cancel = false; @@ -59,13 +58,13 @@ public class CompactCommand extends TableOperation { } } else { try { - if (wait) { + if (compactionConfig.getWait()) { Shell.log.info("Compacting table ..."); } - shellState.getConnector().tableOperations().compact(tableName, startRow, endRow, iterators, flush, wait); + shellState.getConnector().tableOperations().compact(tableName, compactionConfig); - Shell.log.info("Compaction of table " + tableName + " " + (wait ? "completed" : "started") + " for given range"); + Shell.log.info("Compaction of table " + tableName + " " + (compactionConfig.getWait() ? "completed" : "started") + " for given range"); } catch (Exception ex) { throw new AccumuloException(ex); } @@ -85,10 +84,12 @@ public class CompactCommand extends TableOperation { cancel = false; } - flush = !cl.hasOption(noFlushOption.getOpt()); - startRow = OptUtil.getStartRow(cl); - endRow = OptUtil.getEndRow(cl); - wait = cl.hasOption(waitOpt.getOpt()); + compactionConfig = new CompactionConfig(); + + compactionConfig.setFlush(!cl.hasOption(noFlushOption.getOpt())); + compactionConfig.setWait(cl.hasOption(waitOpt.getOpt())); + compactionConfig.setStartRow(OptUtil.getStartRow(cl)); + compactionConfig.setEndRow(OptUtil.getEndRow(cl)); if (cl.hasOption(profileOpt.getOpt())) { List<IteratorSetting> iterators = shellState.iteratorProfiles.get(cl.getOptionValue(profileOpt.getOpt())); @@ -97,11 +98,24 @@ public class CompactCommand extends TableOperation { return -1; } - this.iterators = new ArrayList<IteratorSetting>(iterators); - } else { - this.iterators = Collections.emptyList(); + compactionConfig.setIterators(new ArrayList<>(iterators)); } + if (cl.hasOption(strategyOpt.getOpt())) { + CompactionStrategyConfig csc = new CompactionStrategyConfig(cl.getOptionValue(strategyOpt.getOpt())); + if (cl.hasOption(strategyConfigOpt.getOpt())) { + Map<String,String> props = new HashMap<>(); + String[] keyVals = cl.getOptionValue(strategyConfigOpt.getOpt()).split(","); + for (String keyVal : keyVals) { + String[] sa = keyVal.split("="); + props.put(sa[0], sa[1]); + } + + csc.setOptions(props); + } + + compactionConfig.setCompactionStrategy(csc); + } return super.execute(fullCommand, cl, shellState); } @@ -121,6 +135,11 @@ public class CompactCommand extends TableOperation { profileOpt.setArgName("profile"); opts.addOption(profileOpt); + strategyOpt = new Option("s", "strategy", true, "compaction strategy class name"); + opts.addOption(strategyOpt); + strategyConfigOpt = new Option("sc", "strategyConfig", true, "Key value options for compaction strategy. Expects <prop>=<value>{,<prop>=<value>}"); + opts.addOption(strategyConfigOpt); + cancelOpt = new Option(null, "cancel", false, "cancel user initiated compactions"); opts.addOption(cancelOpt); http://git-wip-us.apache.org/repos/asf/accumulo/blob/2f788f48/test/src/test/java/org/apache/accumulo/proxy/SimpleProxyIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/proxy/SimpleProxyIT.java b/test/src/test/java/org/apache/accumulo/proxy/SimpleProxyIT.java index 50e53a9..c075075 100644 --- a/test/src/test/java/org/apache/accumulo/proxy/SimpleProxyIT.java +++ b/test/src/test/java/org/apache/accumulo/proxy/SimpleProxyIT.java @@ -69,6 +69,7 @@ import org.apache.accumulo.proxy.thrift.BatchScanOptions; import org.apache.accumulo.proxy.thrift.Column; import org.apache.accumulo.proxy.thrift.ColumnUpdate; import org.apache.accumulo.proxy.thrift.CompactionReason; +import org.apache.accumulo.proxy.thrift.CompactionStrategyConfig; import org.apache.accumulo.proxy.thrift.CompactionType; import org.apache.accumulo.proxy.thrift.Condition; import org.apache.accumulo.proxy.thrift.ConditionalStatus; @@ -245,7 +246,7 @@ public class SimpleProxyIT { fail("exception not thrown"); } catch (TException ex) {} try { - client.compactTable(badLogin, table, null, null, null, true, false); + client.compactTable(badLogin, table, null, null, null, true, false, null); fail("exception not thrown"); } catch (AccumuloSecurityException ex) {} try { @@ -531,7 +532,7 @@ public class SimpleProxyIT { fail("exception not thrown"); } catch (TableNotFoundException ex) {} try { - client.compactTable(creds, doesNotExist, null, null, null, true, false); + client.compactTable(creds, doesNotExist, null, null, null, true, false, null); fail("exception not thrown"); } catch (TableNotFoundException ex) {} try { @@ -874,7 +875,7 @@ public class SimpleProxyIT { public void run() { try { Client client2 = new TestProxyClient("localhost", proxyPort, protocolClass.newInstance()).proxy(); - client2.compactTable(creds, "slow", null, null, null, true, true); + client2.compactTable(creds, "slow", null, null, null, true, true, null); } catch (Exception e) { throw new RuntimeException(e); } @@ -1126,7 +1127,7 @@ public class SimpleProxyIT { client.clearLocatorCache(creds, TABLE_TEST); // compact - client.compactTable(creds, TABLE_TEST, null, null, null, true, true); + client.compactTable(creds, TABLE_TEST, null, null, null, true, true, null); assertEquals(1, countFiles(TABLE_TEST)); assertScan(expected, TABLE_TEST); @@ -1141,7 +1142,7 @@ public class SimpleProxyIT { assertEquals(2, diskUsage.size()); assertEquals(1, diskUsage.get(0).getTables().size()); assertEquals(2, diskUsage.get(1).getTables().size()); - client.compactTable(creds, TABLE_TEST2, null, null, null, true, true); + client.compactTable(creds, TABLE_TEST2, null, null, null, true, true, null); diskUsage = (client.getDiskUsage(creds, tablesToScan)); assertEquals(3, diskUsage.size()); assertEquals(1, diskUsage.get(0).getTables().size()); @@ -1591,4 +1592,40 @@ public class SimpleProxyIT { assertEquals(range.start.timestamp, range.start.timestamp); assertEquals(range.stop.timestamp, range.stop.timestamp); } + + @Test + public void testCompactionStrategy() throws Exception { + final String tableName = makeTableName(); + + client.createTable(creds, tableName, true, TimeType.MILLIS); + + client.setProperty(creds, Property.VFS_CONTEXT_CLASSPATH_PROPERTY.getKey() + "context1", + System.getProperty("user.dir") + "/src/test/resources/TestCompactionStrat.jar"); + client.setTableProperty(creds, tableName, Property.TABLE_CLASSPATH.getKey(), "context1"); + + client.addSplits(creds, tableName, Collections.singleton(s2bb("efg"))); + + client.updateAndFlush(creds, tableName, mutation("a", "cf", "cq", "v1")); + client.flushTable(creds, tableName, null, null, true); + + client.updateAndFlush(creds, tableName, mutation("b", "cf", "cq", "v2")); + client.flushTable(creds, tableName, null, null, true); + + client.updateAndFlush(creds, tableName, mutation("y", "cf", "cq", "v1")); + client.flushTable(creds, tableName, null, null, true); + + client.updateAndFlush(creds, tableName, mutation("z", "cf", "cq", "v2")); + client.flushTable(creds, tableName, null, null, true); + + assertEquals(4, countFiles(tableName)); + + CompactionStrategyConfig csc = new CompactionStrategyConfig(); + + // The EfgCompactionStrat will only compact tablets with and end row of efg + csc.setClassName("org.apache.accumulo.test.EfgCompactionStrat"); + + client.compactTable(creds, tableName, null, null, null, true, true, csc); + + assertEquals(3, countFiles(tableName)); + } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/2f788f48/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java b/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java index 544bb76..d878c7f 100644 --- a/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java +++ b/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java @@ -54,8 +54,9 @@ import org.apache.accumulo.core.file.FileSKVWriter; import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.util.UtilWaitThread; -import org.apache.accumulo.shell.Shell; import org.apache.accumulo.harness.SharedMiniClusterIT; +import org.apache.accumulo.shell.Shell; +import org.apache.accumulo.test.UserCompactionStrategyIT.TestCompactionStrategy; import org.apache.accumulo.test.functional.FunctionalTestUtils; import org.apache.accumulo.test.functional.SlowIterator; import org.apache.accumulo.tracer.TraceServer; @@ -725,7 +726,7 @@ public class ShellServerIT extends SharedMiniClusterIT { // make two more files: ts.exec("insert m 1 2 3"); ts.exec("flush -w"); - ts.exec("insert n 1 2 3"); + ts.exec("insert n 1 2 v901"); ts.exec("flush -w"); List<String> oldFiles = getFiles(tableId); @@ -740,6 +741,14 @@ public class ShellServerIT extends SharedMiniClusterIT { ts.exec("merge --all -t " + table); ts.exec("compact -w"); assertEquals(1, countFiles(tableId)); + + // test compaction strategy + ts.exec("insert z 1 2 v900"); + ts.exec("compact -w -s " + TestCompactionStrategy.class.getName() + " -sc inputPrefix=F,dropPrefix=A"); + assertEquals(1, countFiles(tableId)); + ts.exec("scan", true, "v900", true); + ts.exec("scan", true, "v901", false); + ts.exec("deletetable -f " + table); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/2f788f48/test/src/test/java/org/apache/accumulo/test/UserCompactionStrategyIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/UserCompactionStrategyIT.java b/test/src/test/java/org/apache/accumulo/test/UserCompactionStrategyIT.java new file mode 100644 index 0000000..5421f52 --- /dev/null +++ b/test/src/test/java/org/apache/accumulo/test/UserCompactionStrategyIT.java @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.accumulo.test; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Random; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.admin.CompactionConfig; +import org.apache.accumulo.core.client.admin.CompactionStrategyConfig; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.user.RegExFilter; +import org.apache.accumulo.core.metadata.schema.DataFileValue; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.harness.AccumuloClusterIT; +import org.apache.accumulo.server.fs.FileRef; +import org.apache.accumulo.test.functional.FunctionalTestUtils; +import org.apache.accumulo.tserver.compaction.CompactionPlan; +import org.apache.accumulo.tserver.compaction.CompactionStrategy; +import org.apache.accumulo.tserver.compaction.MajorCompactionRequest; +import org.apache.hadoop.io.Text; +import org.junit.Assert; +import org.junit.Test; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; + +public class UserCompactionStrategyIT extends AccumuloClusterIT { + + public static class SizeCompactionStrategy extends CompactionStrategy { + + private long size = 0; + + @Override + public void init(Map<String,String> options) { + size = Long.parseLong(options.get("size")); + } + + @Override + public boolean shouldCompact(MajorCompactionRequest request) throws IOException { + + for (DataFileValue dfv : request.getFiles().values()) + if (dfv.getSize() < size) + return true; + + return false; + } + + @Override + public CompactionPlan getCompactionPlan(MajorCompactionRequest request) throws IOException { + CompactionPlan plan = new CompactionPlan(); + + for (Entry<FileRef,DataFileValue> entry : request.getFiles().entrySet()) + if (entry.getValue().getSize() < size) + plan.inputFiles.add(entry.getKey()); + + return plan; + } + + } + + public static class TestCompactionStrategy extends CompactionStrategy { + + private String inputPrefix = "Z"; + private String dropPrefix = "Z"; + private boolean shouldCompact = false; + + @Override + public void init(Map<String,String> options) { + if (options.containsKey("inputPrefix")) + inputPrefix = options.get("inputPrefix"); + if (options.containsKey("dropPrefix")) + dropPrefix = options.get("dropPrefix"); + if (options.containsKey("shouldCompact")) + shouldCompact = Boolean.parseBoolean(options.get("shouldCompact")); + } + + @Override + public boolean shouldCompact(MajorCompactionRequest request) throws IOException { + if (shouldCompact) + return true; + + for (FileRef fref : request.getFiles().keySet()) { + if (fref.path().getName().startsWith(inputPrefix)) + return true; + if (fref.path().getName().startsWith(dropPrefix)) + return true; + } + + return false; + } + + @Override + public CompactionPlan getCompactionPlan(MajorCompactionRequest request) throws IOException { + CompactionPlan plan = new CompactionPlan(); + + for (FileRef fref : request.getFiles().keySet()) { + if (fref.path().getName().startsWith(dropPrefix)) { + plan.deleteFiles.add(fref); + } else if (fref.path().getName().startsWith(inputPrefix)) { + plan.inputFiles.add(fref); + } + } + + return plan; + } + } + + @Test + public void testDropA() throws Exception { + Connector c = getConnector(); + + String tableName = getUniqueNames(1)[0]; + c.tableOperations().create(tableName); + + writeFlush(c, tableName, "a"); + writeFlush(c, tableName, "b"); + // create a file that starts with A containing rows 'a' and 'b' + c.tableOperations().compact(tableName, new CompactionConfig().setWait(true)); + + writeFlush(c, tableName, "c"); + writeFlush(c, tableName, "d"); + + // drop files that start with A + CompactionStrategyConfig csConfig = new CompactionStrategyConfig(TestCompactionStrategy.class.getName()); + csConfig.setOptions(ImmutableMap.of("dropPrefix", "A", "inputPrefix", "F")); + c.tableOperations().compact(tableName, new CompactionConfig().setWait(true).setCompactionStrategy(csConfig)); + + Assert.assertEquals(ImmutableSet.of("c", "d"), getRows(c, tableName)); + + // this compaction should not drop files starting with A + c.tableOperations().compact(tableName, new CompactionConfig().setWait(true)); + c.tableOperations().compact(tableName, new CompactionConfig().setWait(true)); + + Assert.assertEquals(ImmutableSet.of("c", "d"), getRows(c, tableName)); + } + + private void testDropNone(Map<String,String> options) throws Exception { + + Connector c = getConnector(); + + String tableName = getUniqueNames(1)[0]; + c.tableOperations().create(tableName); + + writeFlush(c, tableName, "a"); + writeFlush(c, tableName, "b"); + + CompactionStrategyConfig csConfig = new CompactionStrategyConfig(TestCompactionStrategy.class.getName()); + csConfig.setOptions(options); + c.tableOperations().compact(tableName, new CompactionConfig().setWait(true).setCompactionStrategy(csConfig)); + + Assert.assertEquals(ImmutableSet.of("a", "b"), getRows(c, tableName)); + } + + @Test + public void testDropNone() throws Exception { + // test a compaction strategy that selects no files. In this case there is no work to do, want to ensure it does not hang. + + testDropNone(ImmutableMap.of("inputPrefix", "Z")); + } + + @Test + public void testDropNone2() throws Exception { + // test a compaction strategy that selects no files. This differs testDropNone() in that shouldCompact() will return true and getCompactionPlan() will + // return no work to do. + + testDropNone(ImmutableMap.of("inputPrefix", "Z", "shouldCompact", "true")); + } + + @Test + public void testPerTableClasspath() throws Exception { + // test pertable classpath + user specified compaction strat + + final Connector c = getConnector(); + final String tableName = getUniqueNames(1)[0]; + c.tableOperations().create(tableName); + c.instanceOperations().setProperty(Property.VFS_CONTEXT_CLASSPATH_PROPERTY.getKey() + "context1", + System.getProperty("user.dir") + "/src/test/resources/TestCompactionStrat.jar"); + c.tableOperations().setProperty(tableName, Property.TABLE_CLASSPATH.getKey(), "context1"); + + c.tableOperations().addSplits(tableName, new TreeSet<Text>(Arrays.asList(new Text("efg")))); + + writeFlush(c, tableName, "a"); + writeFlush(c, tableName, "b"); + + writeFlush(c, tableName, "h"); + writeFlush(c, tableName, "i"); + + Assert.assertEquals(4, FunctionalTestUtils.countRFiles(c, tableName)); + + // EfgCompactionStrat will only compact a tablet w/ end row of 'efg'. No other tablets are compacted. + CompactionStrategyConfig csConfig = new CompactionStrategyConfig("org.apache.accumulo.test.EfgCompactionStrat"); + c.tableOperations().compact(tableName, new CompactionConfig().setWait(true).setCompactionStrategy(csConfig)); + + Assert.assertEquals(3, FunctionalTestUtils.countRFiles(c, tableName)); + + c.tableOperations().compact(tableName, new CompactionConfig().setWait(true)); + + Assert.assertEquals(2, FunctionalTestUtils.countRFiles(c, tableName)); + } + + @Test + public void testIterators() throws Exception { + // test compaction strategy + iterators + + Connector c = getConnector(); + + String tableName = getUniqueNames(1)[0]; + c.tableOperations().create(tableName); + + writeFlush(c, tableName, "a"); + writeFlush(c, tableName, "b"); + // create a file that starts with A containing rows 'a' and 'b' + c.tableOperations().compact(tableName, new CompactionConfig().setWait(true)); + + writeFlush(c, tableName, "c"); + writeFlush(c, tableName, "d"); + + Assert.assertEquals(3, FunctionalTestUtils.countRFiles(c, tableName)); + + // drop files that start with A + CompactionStrategyConfig csConfig = new CompactionStrategyConfig(TestCompactionStrategy.class.getName()); + csConfig.setOptions(ImmutableMap.of("inputPrefix", "F")); + + IteratorSetting iterConf = new IteratorSetting(21, "myregex", RegExFilter.class); + RegExFilter.setRegexs(iterConf, "a|c", null, null, null, false); + + c.tableOperations().compact(tableName, new CompactionConfig().setWait(true).setCompactionStrategy(csConfig).setIterators(Arrays.asList(iterConf))); + + // compaction strategy should only be applied to one file. If its applied to both, then row 'b' would be dropped by filter. + Assert.assertEquals(ImmutableSet.of("a", "b", "c"), getRows(c, tableName)); + + Assert.assertEquals(2, FunctionalTestUtils.countRFiles(c, tableName)); + + c.tableOperations().compact(tableName, new CompactionConfig().setWait(true)); + + // ensure that iterator is not applied + Assert.assertEquals(ImmutableSet.of("a", "b", "c"), getRows(c, tableName)); + + Assert.assertEquals(1, FunctionalTestUtils.countRFiles(c, tableName)); + } + + @Test + public void testFileSize() throws Exception { + Connector c = getConnector(); + + String tableName = getUniqueNames(1)[0]; + c.tableOperations().create(tableName); + + // write random data because its very unlikely it will compress + writeRandomValue(c, tableName, 1 << 16); + writeRandomValue(c, tableName, 1 << 16); + + writeRandomValue(c, tableName, 1 << 9); + writeRandomValue(c, tableName, 1 << 7); + writeRandomValue(c, tableName, 1 << 6); + + Assert.assertEquals(5, FunctionalTestUtils.countRFiles(c, tableName)); + + CompactionStrategyConfig csConfig = new CompactionStrategyConfig(SizeCompactionStrategy.class.getName()); + csConfig.setOptions(ImmutableMap.of("size", "" + (1 << 15))); + c.tableOperations().compact(tableName, new CompactionConfig().setWait(true).setCompactionStrategy(csConfig)); + + Assert.assertEquals(3, FunctionalTestUtils.countRFiles(c, tableName)); + + csConfig = new CompactionStrategyConfig(SizeCompactionStrategy.class.getName()); + csConfig.setOptions(ImmutableMap.of("size", "" + (1 << 17))); + c.tableOperations().compact(tableName, new CompactionConfig().setWait(true).setCompactionStrategy(csConfig)); + + Assert.assertEquals(1, FunctionalTestUtils.countRFiles(c, tableName)); + + } + + void writeRandomValue(Connector c, String tableName, int size) throws Exception { + Random rand = new Random(); + + byte data1[] = new byte[size]; + rand.nextBytes(data1); + + BatchWriter bw = c.createBatchWriter(tableName, new BatchWriterConfig()); + + Mutation m1 = new Mutation("r" + rand.nextInt(909090)); + m1.put("data", "bl0b", new Value(data1)); + + bw.addMutation(m1); + bw.close(); + c.tableOperations().flush(tableName, null, null, true); + } + + private Set<String> getRows(Connector c, String tableName) throws TableNotFoundException { + Set<String> rows = new HashSet<String>(); + Scanner scanner = c.createScanner(tableName, Authorizations.EMPTY); + + for (Entry<Key,Value> entry : scanner) + rows.add(entry.getKey().getRowData().toString()); + return rows; + + } + + private void writeFlush(Connector conn, String tablename, String row) throws Exception { + BatchWriter bw = conn.createBatchWriter(tablename, new BatchWriterConfig()); + Mutation m = new Mutation(row); + m.put("", "", ""); + bw.addMutation(m); + bw.close(); + conn.tableOperations().flush(tablename, null, null, true); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/2f788f48/test/src/test/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java b/test/src/test/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java index 4e0721b..e4e7229 100644 --- a/test/src/test/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java +++ b/test/src/test/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java @@ -16,8 +16,6 @@ */ package org.apache.accumulo.test.functional; -import static org.junit.Assert.assertFalse; - import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; @@ -49,8 +47,19 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; +import static org.junit.Assert.assertFalse; + public class FunctionalTestUtils { + public static int countRFiles(Connector c, String tableName) throws Exception { + Scanner scanner = c.createScanner(MetadataTable.NAME, Authorizations.EMPTY); + String tableId = c.tableOperations().tableIdMap().get(tableName); + scanner.setRange(MetadataSchema.TabletsSection.getRange(tableId)); + scanner.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME); + + return count(scanner); + } + static void checkRFiles(Connector c, String tableName, int minTablets, int maxTablets, int minRFiles, int maxRFiles) throws Exception { Scanner scanner = c.createScanner(MetadataTable.NAME, Authorizations.EMPTY); String tableId = c.tableOperations().tableIdMap().get(tableName);
