Repository: accumulo Updated Branches: refs/heads/master 85d254e12 -> fad97f7a6
ACCUMULO-3645 Run user major compactions with iterators when tablets are empty. Removed/modified lines in Tablet that stop construction of the SKVI stack when a tablet has no data and there are iterators present and it is a user compaction. Includes HardListIterator, an iterator that generates data for testing purposes. Includes integration tests in accumulo-test. Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/fad97f7a Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/fad97f7a Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/fad97f7a Branch: refs/heads/master Commit: fad97f7a667dd34dcdce576fc0a1ac3514db434b Parents: 85d254e Author: Dylan Hutchison <dhutc...@mit.edu> Authored: Tue Apr 14 16:49:20 2015 -0400 Committer: Dylan Hutchison <dhutc...@mit.edu> Committed: Tue Apr 14 16:52:31 2015 -0400 ---------------------------------------------------------------------- .../EverythingCompactionStrategy.java | 2 +- .../apache/accumulo/tserver/tablet/Tablet.java | 63 ++++----- .../apache/accumulo/test/HardListIterator.java | 115 +++++++++++++++++ .../apache/accumulo/test/TableOperationsIT.java | 127 +++++++++++++++++++ 4 files changed, 277 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/fad97f7a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/EverythingCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/EverythingCompactionStrategy.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/EverythingCompactionStrategy.java index 9295c30..2710177 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/EverythingCompactionStrategy.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/EverythingCompactionStrategy.java @@ -27,7 +27,7 @@ public class EverythingCompactionStrategy extends CompactionStrategy { @Override public boolean shouldCompact(MajorCompactionRequest request) throws IOException { - return request.getFiles().size() > 0; + return true; // ACCUMULO-3645 compact for empty files too } @Override http://git-wip-us.apache.org/repos/asf/accumulo/blob/fad97f7a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java index 2342789..7c152b0 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java @@ -1863,21 +1863,26 @@ public class Tablet implements TabletCommitter { if (inputFiles.isEmpty()) { if (reason == MajorCompactionReason.USER) { - // no work to do - lastCompactID = compactionId.getFirst(); - updateCompactionID = true; + if (compactionId.getSecond().getIterators().isEmpty()) { + log.debug("No-op major compaction by USER on 0 input files because no iterators present."); + lastCompactID = compactionId.getFirst(); + updateCompactionID = true; + } else { + log.debug("Major compaction by USER on 0 input files with iterators."); + filesToCompact = new HashMap<>(); + } } else { return majCStats; } } else { // If no original files will exist at the end of the compaction, we do not have to propogate deletes - Set<FileRef> droppedFiles = new HashSet<FileRef>(); + Set<FileRef> droppedFiles = new HashSet<>(); droppedFiles.addAll(inputFiles); if (plan != null) droppedFiles.addAll(plan.deleteFiles); propogateDeletes = !(droppedFiles.equals(allFiles.keySet())); log.debug("Major compaction plan: " + plan + " propogate deletes : " + propogateDeletes); - filesToCompact = new HashMap<FileRef,DataFileValue>(allFiles); + filesToCompact = new HashMap<>(allFiles); filesToCompact.keySet().retainAll(inputFiles); getDatafileManager().reserveMajorCompactingFiles(filesToCompact.keySet()); @@ -1916,6 +1921,7 @@ public class Tablet implements TabletCommitter { // compaction was canceled return majCStats; } + compactionIterators = compactionId.getSecond().getIterators(); synchronized (this) { if (lastCompactID >= compactionId.getFirst()) @@ -1924,12 +1930,11 @@ public class Tablet implements TabletCommitter { } } - compactionIterators = compactionId.getSecond().getIterators(); } // need to handle case where only one file is being major compacted - while (filesToCompact.size() > 0) { - + // ACCUMULO-3645 run loop at least once, even if filesToCompact.isEmpty() + do { int numToCompact = maxFilesToCompact; if (filesToCompact.size() > maxFilesToCompact && filesToCompact.size() < 2 * maxFilesToCompact) { @@ -1995,7 +2000,7 @@ public class Tablet implements TabletCommitter { span.stop(); } - } + } while (filesToCompact.size() > 0); return majCStats; } finally { synchronized (Tablet.this) { @@ -2025,6 +2030,13 @@ public class Tablet implements TabletCommitter { private Set<FileRef> removeSmallest(Map<FileRef,DataFileValue> filesToCompact, int maxFilesToCompact) { // ensure this method works properly when multiple files have the same size + // short-circuit; also handles zero files case + if (filesToCompact.size() <= maxFilesToCompact) { + Set<FileRef> smallestFiles = new HashSet<FileRef>(filesToCompact.keySet()); + filesToCompact.clear(); + return smallestFiles; + } + PriorityQueue<Pair<FileRef,Long>> fileHeap = new PriorityQueue<Pair<FileRef,Long>>(filesToCompact.size(), new Comparator<Pair<FileRef,Long>>() { @Override public int compare(Pair<FileRef,Long> o1, Pair<FileRef,Long> o2) { @@ -2565,29 +2577,22 @@ public class Tablet implements TabletCommitter { if (isClosing() || isClosed() || majorCompactionQueued.contains(MajorCompactionReason.USER) || isMajorCompactionRunning()) return; - if (getDatafileManager().getDatafileSizes().size() == 0) { - // no files, so jsut update the metadata table - majorCompactionState = CompactionState.IN_PROGRESS; - updateMetadata = true; - lastCompactID = compactionId; - } else { - CompactionStrategyConfig strategyConfig = compactionConfig.getCompactionStrategy(); - CompactionStrategy strategy = createCompactionStrategy(strategyConfig); + CompactionStrategyConfig strategyConfig = compactionConfig.getCompactionStrategy(); + CompactionStrategy strategy = createCompactionStrategy(strategyConfig); - MajorCompactionRequest request = new MajorCompactionRequest(extent, MajorCompactionReason.USER, getTabletServer().getFileSystem(), tableConfiguration); - request.setFiles(getDatafileManager().getDatafileSizes()); + MajorCompactionRequest request = new MajorCompactionRequest(extent, MajorCompactionReason.USER, getTabletServer().getFileSystem(), tableConfiguration); + request.setFiles(getDatafileManager().getDatafileSizes()); - try { - if (strategy.shouldCompact(request)) { - initiateMajorCompaction(MajorCompactionReason.USER); - } else { - majorCompactionState = CompactionState.IN_PROGRESS; - updateMetadata = true; - lastCompactID = compactionId; - } - } catch (IOException e) { - throw new RuntimeException(e); + try { + if (strategy.shouldCompact(request)) { + initiateMajorCompaction(MajorCompactionReason.USER); + } else { + majorCompactionState = CompactionState.IN_PROGRESS; + updateMetadata = true; + lastCompactID = compactionId; } + } catch (IOException e) { + throw new RuntimeException(e); } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/fad97f7a/test/src/main/java/org/apache/accumulo/test/HardListIterator.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/HardListIterator.java b/test/src/main/java/org/apache/accumulo/test/HardListIterator.java new file mode 100644 index 0000000..bd9cbf1 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/HardListIterator.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test; + +import org.apache.accumulo.core.data.ByteSequence; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.PartialKey; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.IteratorEnvironment; +import org.apache.accumulo.core.iterators.IteratorUtil; +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.accumulo.core.util.PeekingIterator; +import org.apache.hadoop.io.Text; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; + +/** + * A wrapper making a list of hardcoded data into a SKVI. For testing. + */ +public class HardListIterator implements SortedKeyValueIterator<Key,Value> { + private static final Logger log = Logger.getLogger(HardListIterator.class); + public final static SortedMap<Key,Value> allEntriesToInject; + static { + SortedMap<Key,Value> t = new TreeMap<>(); + t.put(new Key(new Text("a1"), new Text("colF3"), new Text("colQ3"), System.currentTimeMillis()), new Value("1".getBytes())); + t.put(new Key(new Text("c1"), new Text("colF3"), new Text("colQ3"), System.currentTimeMillis()), new Value("1".getBytes())); + t.put(new Key(new Text("m1"), new Text("colF3"), new Text("colQ3"), System.currentTimeMillis()), new Value("1".getBytes())); + allEntriesToInject = Collections.unmodifiableSortedMap(t); // for safety + } + + private PeekingIterator<Map.Entry<Key,Value>> inner; + private Range seekRng; + + @Override + public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException { + if (source != null) + log.info("HardListIterator ignores/replaces parent source passed in init(): " + source); + + IteratorUtil.IteratorScope scope = env.getIteratorScope(); + log.debug(this.getClass() + ": init on scope " + scope + (scope == IteratorUtil.IteratorScope.majc ? " fullScan=" + env.isFullMajorCompaction() : "")); + + // define behavior before seek as seek to start at negative infinity + inner = new PeekingIterator<>(allEntriesToInject.entrySet().iterator()); + } + + @Override + public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) { + HardListIterator newInstance; + try { + newInstance = HardListIterator.class.newInstance(); + } catch (Exception e) { + throw new RuntimeException(e); + } + newInstance.inner = new PeekingIterator<>(allEntriesToInject.tailMap(inner.peek().getKey()).entrySet().iterator()); + + return newInstance; + } + + @Override + public boolean hasTop() { + if (!inner.hasNext()) + return false; + Key k = inner.peek().getKey(); + return seekRng.contains(k); // do not return entries past the seek() range + } + + @Override + public void next() throws IOException { + inner.next(); + } + + @Override + public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException { + seekRng = range; + // seek to first entry inside range + if (range.isInfiniteStartKey()) + inner = new PeekingIterator<>(allEntriesToInject.entrySet().iterator()); + else if (range.isStartKeyInclusive()) + inner = new PeekingIterator<>(allEntriesToInject.tailMap(range.getStartKey()).entrySet().iterator()); + else + inner = new PeekingIterator<>(allEntriesToInject.tailMap(range.getStartKey().followingKey(PartialKey.ROW_COLFAM_COLQUAL_COLVIS_TIME)).entrySet() + .iterator()); + } + + @Override + public Key getTopKey() { + return hasTop() ? inner.peek().getKey() : null; + } + + @Override + public Value getTopValue() { + return hasTop() ? inner.peek().getValue() : null; + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/fad97f7a/test/src/test/java/org/apache/accumulo/test/TableOperationsIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/TableOperationsIT.java b/test/src/test/java/org/apache/accumulo/test/TableOperationsIT.java index 083a77a..0127e6e 100644 --- a/test/src/test/java/org/apache/accumulo/test/TableOperationsIT.java +++ b/test/src/test/java/org/apache/accumulo/test/TableOperationsIT.java @@ -20,15 +20,19 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.SortedSet; +import java.util.TreeMap; import java.util.TreeSet; import org.apache.accumulo.core.client.AccumuloException; @@ -36,6 +40,7 @@ import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.BatchWriterConfig; import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.TableExistsException; import org.apache.accumulo.core.client.TableNotFoundException; @@ -45,14 +50,18 @@ import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.constraints.DefaultKeySizeConstraint; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.PartialKey; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.security.TablePermission; import org.apache.accumulo.core.tabletserver.thrift.TabletClientService; +import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.harness.AccumuloClusterIT; +import org.apache.accumulo.test.functional.BadIterator; import org.apache.hadoop.io.Text; import org.apache.thrift.TException; import org.junit.Assert; +import org.junit.Assume; import org.junit.Before; import org.junit.Test; @@ -229,4 +238,122 @@ public class TableOperationsIT extends AccumuloClusterIT { return map; } + @Test + public void testCompactEmptyTableWithGeneratorIterator() throws TableExistsException, AccumuloException, AccumuloSecurityException, TableNotFoundException { + String tableName = getUniqueNames(1)[0]; + connector.tableOperations().create(tableName); + + List<IteratorSetting> list = new ArrayList<>(); + list.add(new IteratorSetting(15, HardListIterator.class)); + connector.tableOperations().compact(tableName, null, null, list, true, true); + + Scanner scanner = connector.createScanner(tableName, Authorizations.EMPTY); + Map<Key,Value> actual = new TreeMap<>(COMPARE_KEY_TO_COLQ); // only compare row, colF, colQ + for (Map.Entry<Key,Value> entry : scanner) + actual.put(entry.getKey(), entry.getValue()); + assertEquals(HardListIterator.allEntriesToInject, actual); + connector.tableOperations().delete(tableName); + } + + /** Compare only the row, column family and column qualifier. */ + static class KeyRowColFColQComparator implements Comparator<Key> { + @Override + public int compare(Key k1, Key k2) { + return k1.compareTo(k2, PartialKey.ROW_COLFAM_COLQUAL); + } + } + + static final KeyRowColFColQComparator COMPARE_KEY_TO_COLQ = new KeyRowColFColQComparator(); + + @Test + public void testCompactEmptyTableWithGeneratorIterator_Splits() throws TableExistsException, AccumuloException, AccumuloSecurityException, + TableNotFoundException { + String tableName = getUniqueNames(1)[0]; + connector.tableOperations().create(tableName); + SortedSet<Text> splitset = new TreeSet<>(); + splitset.add(new Text("f")); + connector.tableOperations().addSplits(tableName, splitset); + + List<IteratorSetting> list = new ArrayList<>(); + list.add(new IteratorSetting(15, HardListIterator.class)); + connector.tableOperations().compact(tableName, null, null, list, true, true); + + Scanner scanner = connector.createScanner(tableName, Authorizations.EMPTY); + Map<Key,Value> actual = new TreeMap<>(COMPARE_KEY_TO_COLQ); // only compare row, colF, colQ + for (Map.Entry<Key,Value> entry : scanner) + actual.put(entry.getKey(), entry.getValue()); + assertEquals(HardListIterator.allEntriesToInject, actual); + connector.tableOperations().delete(tableName); + } + + @Test + public void testCompactEmptyTableWithGeneratorIterator_Splits_Cancel() throws TableExistsException, AccumuloException, AccumuloSecurityException, + TableNotFoundException { + String tableName = getUniqueNames(1)[0]; + connector.tableOperations().create(tableName); + SortedSet<Text> splitset = new TreeSet<>(); + splitset.add(new Text("f")); + connector.tableOperations().addSplits(tableName, splitset); + + List<IteratorSetting> list = new ArrayList<>(); + list.add(new IteratorSetting(15, HardListIterator.class)); + connector.tableOperations().compact(tableName, null, null, list, true, false); // don't block + connector.tableOperations().cancelCompaction(tableName); + // depending on timing, compaction will finish or be canceled + + Scanner scanner = connector.createScanner(tableName, Authorizations.EMPTY); + Map<Key,Value> actual = new TreeMap<>(COMPARE_KEY_TO_COLQ); // only compare row, colF, colQ + for (Map.Entry<Key,Value> entry : scanner) + actual.put(entry.getKey(), entry.getValue()); + Assume.assumeFalse("Compaction successfully occurred due to weird timing but we hoped it would cancel.", + HardListIterator.allEntriesToInject.equals(actual)); + assertTrue("Scan should be empty if compaction canceled. " + "Actual is " + actual, actual.isEmpty()); + connector.tableOperations().delete(tableName); + } + + @Test + public void testCompactEmptyTableWithGeneratorIterator_Splits_Partial() throws TableExistsException, AccumuloException, AccumuloSecurityException, + TableNotFoundException { + String tableName = getUniqueNames(1)[0]; + connector.tableOperations().create(tableName); + Text splitRow = new Text("f"); + SortedSet<Text> splitset = new TreeSet<>(); + splitset.add(splitRow); + connector.tableOperations().addSplits(tableName, splitset); + + List<IteratorSetting> list = new ArrayList<>(); + list.add(new IteratorSetting(15, HardListIterator.class)); + // compact the second tablet, not the first + connector.tableOperations().compact(tableName, splitRow, null, list, true, true); + + Scanner scanner = connector.createScanner(tableName, Authorizations.EMPTY); + Map<Key,Value> actual = new TreeMap<>(COMPARE_KEY_TO_COLQ); // only compare row, colF, colQ + for (Map.Entry<Key,Value> entry : scanner) + actual.put(entry.getKey(), entry.getValue()); + // only expect the entries in the second tablet + assertEquals(HardListIterator.allEntriesToInject.tailMap(new Key(splitRow)), actual); + connector.tableOperations().delete(tableName); + } + + /** Test recovery from bad majc iterator via compaction cancel. */ + @Test + public void testCompactEmptyTablesWithBadIterator_FailsAndCancel() throws TableExistsException, AccumuloException, AccumuloSecurityException, + TableNotFoundException { + String tableName = getUniqueNames(1)[0]; + connector.tableOperations().create(tableName); + + List<IteratorSetting> list = new ArrayList<>(); + list.add(new IteratorSetting(15, BadIterator.class)); + connector.tableOperations().compact(tableName, null, null, list, true, false); // don't block + UtilWaitThread.sleep(2000); // start compaction + connector.tableOperations().cancelCompaction(tableName); + + Scanner scanner = connector.createScanner(tableName, Authorizations.EMPTY); + Map<Key,Value> actual = new TreeMap<>(); + for (Map.Entry<Key,Value> entry : scanner) + actual.put(entry.getKey(), entry.getValue()); + assertTrue("Should be empty. Actual is " + actual, actual.isEmpty()); + connector.tableOperations().delete(tableName); + } + }