Repository: accumulo Updated Branches: refs/heads/master a15ff128b -> f3b9d1925
ACCUMULO-1124 Shorten keys in index and uses statistics to choose better keys Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/1ac7b4a9 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/1ac7b4a9 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/1ac7b4a9 Branch: refs/heads/master Commit: 1ac7b4a9ffe0ebd5e56c4bbb672eea1ba79dd688 Parents: fd37134 Author: Keith Turner <ktur...@apache.org> Authored: Fri May 27 11:54:43 2016 -0400 Committer: Keith Turner <ktur...@apache.org> Committed: Fri May 27 11:54:43 2016 -0400 ---------------------------------------------------------------------- .../accumulo/core/file/rfile/KeyShortener.java | 138 +++++++++++++++++ .../accumulo/core/file/rfile/PrintInfo.java | 86 +++++++++-- .../apache/accumulo/core/file/rfile/RFile.java | 64 ++++++-- .../core/file/rfile/RFileOperations.java | 2 +- .../accumulo/core/file/rfile/RelativeKey.java | 8 +- .../core/file/rfile/KeyShortenerTest.java | 147 +++++++++++++++++++ .../core/file/rfile/RFileMetricsTest.java | 2 +- .../accumulo/core/file/rfile/RFileTest.java | 67 ++++++++- .../core/file/rfile/RelativeKeyTest.java | 28 +++- 9 files changed, 499 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/1ac7b4a9/core/src/main/java/org/apache/accumulo/core/file/rfile/KeyShortener.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/KeyShortener.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/KeyShortener.java new file mode 100644 index 0000000..b039982 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/KeyShortener.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.accumulo.core.file.rfile; + +import org.apache.accumulo.core.data.ArrayByteSequence; +import org.apache.accumulo.core.data.ByteSequence; +import org.apache.accumulo.core.data.Key; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.primitives.Bytes; + +/* + * Code to shorten keys that will be placed into RFile indexes. This code attempts to find a key thats between two keys that shorter. + */ +public class KeyShortener { + + private static final byte[] EMPTY = new byte[0]; + private static final byte[] B00 = new byte[] {(byte) 0x00}; + private static final byte[] BFF = new byte[] {(byte) 0xff}; + + private static final Logger log = LoggerFactory.getLogger(KeyShortener.class); + + private KeyShortener() {} + + private static int findNonFF(ByteSequence bs, int start) { + for (int i = start; i < bs.length(); i++) { + if (bs.byteAt(i) != (byte) 0xff) { + return i; + } + } + + return bs.length(); + } + + /* + * return S such that prev < S < current or null if no such sequence + */ + public static ByteSequence shorten(ByteSequence prev, ByteSequence current) { + + int minLen = Math.min(prev.length(), current.length()); + + for (int i = 0; i < minLen; i++) { + int pb = 0xff & prev.byteAt(i); + int cb = 0xff & current.byteAt(i); + + int diff = cb - pb; + + if (diff == 1) { + int newLen = findNonFF(prev, i + 1); + byte[] successor; + if (newLen < prev.length()) { + successor = Bytes.concat(prev.subSequence(0, newLen).toArray(), BFF); + } else { + successor = Bytes.concat(prev.subSequence(0, newLen).toArray(), B00); + } + return new ArrayByteSequence(successor); + } else if (diff > 1) { + byte[] copy = new byte[i + 1]; + System.arraycopy(prev.subSequence(0, i + 1).toArray(), 0, copy, 0, i + 1); + copy[i] = (byte) ((0xff & copy[i]) + 1); + return new ArrayByteSequence(copy); + } + } + + ArrayByteSequence successor = new ArrayByteSequence(Bytes.concat(prev.toArray(), B00)); + if (successor.equals(current)) { + return null; + } + + return successor; + } + + /* + * This entire class supports an optional optimization. This code does a sanity check to ensure the optimization code did what was intended, doing a noop if + * there is a bug. + */ + @VisibleForTesting + static Key sanityCheck(Key prev, Key current, Key shortened) { + if (prev.compareTo(shortened) >= 0) { + log.warn("Bug in key shortening code, please open an issue " + prev + " >= " + shortened); + return prev; + } + + if (current.compareTo(shortened) <= 0) { + log.warn("Bug in key shortening code, please open an issue " + current + " <= " + shortened); + return prev; + } + + return shortened; + } + + /* + * Find a key K where prev < K < current AND K is shorter. If can not find a K that meets criteria, then returns prev. + */ + public static Key shorten(Key prev, Key current) { + Preconditions.checkArgument(prev.compareTo(current) <= 0, "Expected key less than or equal. " + prev + " > " + current); + + if (prev.getRowData().compareTo(current.getRowData()) < 0) { + ByteSequence shortenedRow = shorten(prev.getRowData(), current.getRowData()); + if (shortenedRow == null) { + return prev; + } + return sanityCheck(prev, current, new Key(shortenedRow.toArray(), EMPTY, EMPTY, EMPTY, 0)); + } else if (prev.getColumnFamilyData().compareTo(current.getColumnFamilyData()) < 0) { + ByteSequence shortenedFam = shorten(prev.getColumnFamilyData(), current.getColumnFamilyData()); + if (shortenedFam == null) { + return prev; + } + return sanityCheck(prev, current, new Key(prev.getRowData().toArray(), shortenedFam.toArray(), EMPTY, EMPTY, 0)); + } else if (prev.getColumnQualifierData().compareTo(current.getColumnQualifierData()) < 0) { + ByteSequence shortenedQual = shorten(prev.getColumnQualifierData(), current.getColumnQualifierData()); + if (shortenedQual == null) { + return prev; + } + return sanityCheck(prev, current, new Key(prev.getRowData().toArray(), prev.getColumnFamilyData().toArray(), shortenedQual.toArray(), EMPTY, 0)); + } else { + return prev; + } + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/1ac7b4a9/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java index cfe571c..e166557 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java @@ -32,6 +32,7 @@ import org.apache.accumulo.core.file.FileSKVIterator; import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile; import org.apache.accumulo.core.file.rfile.RFile.Reader; import org.apache.accumulo.start.spi.KeywordExecutable; +import org.apache.commons.math.stat.descriptive.SummaryStatistics; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -57,12 +58,52 @@ public class PrintInfo implements KeywordExecutable { boolean histogram = false; @Parameter(names = {"--useSample"}, description = "Use sample data for --dump, --vis, --histogram options") boolean useSample = false; + @Parameter(names = {"--keyStats"}, description = "print key length statistics for index and all data") + boolean keyStats = false; @Parameter(description = " <file> { <file> ... }") List<String> files = new ArrayList<String>(); @Parameter(names = {"-c", "--config"}, variableArity = true, description = "Comma-separated Hadoop configuration files") List<String> configFiles = new ArrayList<>(); } + static class LogHistogram { + long countBuckets[] = new long[11]; + long sizeBuckets[] = new long[countBuckets.length]; + long totalSize = 0; + + public void add(int size) { + int bucket = (int) Math.log10(size); + countBuckets[bucket]++; + sizeBuckets[bucket] += size; + totalSize += size; + } + + public void print(String indent) { + System.out.println(indent + "Up to size count %-age"); + for (int i = 1; i < countBuckets.length; i++) { + System.out.println(String.format("%s%11.0f : %10d %6.2f%%", indent, Math.pow(10, i), countBuckets[i], sizeBuckets[i] * 100. / totalSize)); + } + } + } + + static class KeyStats { + private SummaryStatistics stats = new SummaryStatistics(); + private LogHistogram logHistogram = new LogHistogram(); + + public void add(Key k) { + int size = k.getSize(); + stats.addValue(size); + logHistogram.add(size); + } + + public void print(String indent) { + logHistogram.print(indent); + System.out.println(); + System.out.printf("%smin:%,11.2f max:%,11.2f avg:%,11.2f stddev:%,11.2f\n", indent, stats.getMin(), stats.getMax(), stats.getMean(), + stats.getStandardDeviation()); + } + } + public static void main(String[] args) throws Exception { new PrintInfo().execute(args); } @@ -90,9 +131,10 @@ public class PrintInfo implements KeywordExecutable { FileSystem hadoopFs = FileSystem.get(conf); FileSystem localFs = FileSystem.getLocal(conf); - long countBuckets[] = new long[11]; - long sizeBuckets[] = new long[countBuckets.length]; - long totalSize = 0; + LogHistogram kvHistogram = new LogHistogram(); + + KeyStats dataKeyStats = new KeyStats(); + KeyStats indexKeyStats = new KeyStats(); for (String arg : opts.files) { Path path = new Path(arg); @@ -119,7 +161,7 @@ public class PrintInfo implements KeywordExecutable { Map<String,ArrayList<ByteSequence>> localityGroupCF = null; - if (opts.histogram || opts.dump || opts.vis || opts.hash) { + if (opts.histogram || opts.dump || opts.vis || opts.hash || opts.keyStats) { localityGroupCF = iter.getLocalityGroupCF(); FileSKVIterator dataIter; @@ -134,6 +176,14 @@ public class PrintInfo implements KeywordExecutable { dataIter = iter; } + if (opts.keyStats) { + FileSKVIterator indexIter = iter.getIndex(); + while (indexIter.hasTop()) { + indexKeyStats.add(indexIter.getTopKey()); + indexIter.next(); + } + } + for (Entry<String,ArrayList<ByteSequence>> cf : localityGroupCF.entrySet()) { dataIter.seek(new Range((Key) null, (Key) null), cf.getValue(), true); @@ -146,30 +196,36 @@ public class PrintInfo implements KeywordExecutable { return; } if (opts.histogram) { - long size = key.getSize() + value.getSize(); - int bucket = (int) Math.log10(size); - countBuckets[bucket]++; - sizeBuckets[bucket] += size; - totalSize += size; + kvHistogram.add(key.getSize() + value.getSize()); + } + if (opts.keyStats) { + dataKeyStats.add(key); } dataIter.next(); } } } - System.out.println(); iter.close(); - if (opts.vis || opts.hash) + if (opts.vis || opts.hash) { + System.out.println(); vmg.printMetrics(opts.hash, "Visibility", System.out); + } if (opts.histogram) { - System.out.println("Up to size count %-age"); - for (int i = 1; i < countBuckets.length; i++) { - System.out.println(String.format("%11.0f : %10d %6.2f%%", Math.pow(10, i), countBuckets[i], sizeBuckets[i] * 100. / totalSize)); - } + System.out.println(); + kvHistogram.print(""); } + if (opts.keyStats) { + System.out.println(); + System.out.println("Statistics for keys in data :"); + dataKeyStats.print("\t"); + System.out.println(); + System.out.println("Statistics for keys in index :"); + indexKeyStats.print("\t"); + } // If the output stream has closed, there is no reason to keep going. if (System.out.checkError()) return; http://git-wip-us.apache.org/repos/asf/accumulo/blob/1ac7b4a9/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java index a73936b..6032c7f 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java @@ -70,6 +70,7 @@ import org.apache.accumulo.core.sample.Sampler; import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl; import org.apache.accumulo.core.util.MutableByteSequence; import org.apache.commons.lang.mutable.MutableLong; +import org.apache.commons.math.stat.descriptive.SummaryStatistics; import org.apache.hadoop.io.Writable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -89,6 +90,10 @@ public class RFile { static final int RINDEX_VER_8 = 8; // Added sample storage. There is a sample locality group for each locality group. Sample are built using a Sampler and // sampler configuration. The Sampler and its configuration are stored in RFile. Persisting the method of producing the // sample allows a user of RFile to determine if the sample is useful. + // + // Selected smaller keys for index by doing two things. First internal stats were used to look for keys that were below + // average in size for the index. Also keys that were statistically large were excluded from the index. Second shorter keys + // (that may not exist in data) were generated for the index. static final int RINDEX_VER_7 = 7; // Added support for prefix encoding and encryption. Before this change only exact matches within a key field were deduped // for consecutive keys. After this change, if consecutive key fields have the same prefix then the prefix is only stored // once. @@ -375,7 +380,8 @@ public class RFile { private ABlockWriter blockWriter; // private BlockAppender blockAppender; - private long blockSize = 100000; + private final long blockSize; + private final long maxBlockSize; private int entries = 0; private LocalityGroupMetadata currentLocalityGroup = null; @@ -386,13 +392,23 @@ public class RFile { private SampleLocalityGroupWriter sample; - LocalityGroupWriter(BlockFileWriter fileWriter, long blockSize, LocalityGroupMetadata currentLocalityGroup, SampleLocalityGroupWriter sample) { + private SummaryStatistics keyLenStats = new SummaryStatistics(); + private double avergageKeySize = 0; + + LocalityGroupWriter(BlockFileWriter fileWriter, long blockSize, long maxBlockSize, LocalityGroupMetadata currentLocalityGroup, + SampleLocalityGroupWriter sample) { this.fileWriter = fileWriter; this.blockSize = blockSize; + this.maxBlockSize = maxBlockSize; this.currentLocalityGroup = currentLocalityGroup; this.sample = sample; } + private boolean isGiantKey(Key k) { + // consider a key thats more than 3 standard deviations from previously seen key sizes as giant + return k.getSize() > keyLenStats.getMean() + keyLenStats.getStandardDeviation() * 3; + } + public void append(Key key, Value value) throws IOException { if (key.compareTo(prevKey) < 0) { @@ -412,8 +428,22 @@ public class RFile { if (blockWriter == null) { blockWriter = fileWriter.prepareDataBlock(); } else if (blockWriter.getRawSize() > blockSize) { - closeBlock(prevKey, false); - blockWriter = fileWriter.prepareDataBlock(); + + // Look for a key thats short to put in the index, defining short as average or below. + if (avergageKeySize == 0) { + // use the same average for the search for a below average key for a block + avergageKeySize = keyLenStats.getMean(); + } + + // Possibly produce a shorter key that does not exist in data. Even if a key can be shortened, it may not be below average. + Key closeKey = KeyShortener.shorten(prevKey, key); + + if ((closeKey.getSize() <= avergageKeySize || blockWriter.getRawSize() > maxBlockSize) && !isGiantKey(closeKey)) { + closeBlock(closeKey, false); + blockWriter = fileWriter.prepareDataBlock(); + // set average to zero so its recomputed for the next block + avergageKeySize = 0; + } } RelativeKey rk = new RelativeKey(lastKeyInBlock, key); @@ -422,6 +452,8 @@ public class RFile { value.write(blockWriter); entries++; + keyLenStats.addValue(key.getSize()); + prevKey = new Key(key); lastKeyInBlock = prevKey; @@ -457,12 +489,14 @@ public class RFile { public static class Writer implements FileSKVWriter { public static final int MAX_CF_IN_DLG = 1000; + private static final double MAX_BLOCK_MULTIPLIER = 1.1; private BlockFileWriter fileWriter; // private BlockAppender blockAppender; - private long blockSize = 100000; - private int indexBlockSize; + private final long blockSize; + private final long maxBlockSize; + private final int indexBlockSize; private ArrayList<LocalityGroupMetadata> localityGroups = new ArrayList<LocalityGroupMetadata>(); private ArrayList<LocalityGroupMetadata> sampleGroups = new ArrayList<LocalityGroupMetadata>(); @@ -487,6 +521,7 @@ public class RFile { public Writer(BlockFileWriter bfw, int blockSize, int indexBlockSize, SamplerConfigurationImpl samplerConfig, Sampler sampler) throws IOException { this.blockSize = blockSize; + this.maxBlockSize = (long) (blockSize * MAX_BLOCK_MULTIPLIER); this.indexBlockSize = indexBlockSize; this.fileWriter = bfw; previousColumnFamilies = new HashSet<ByteSequence>(); @@ -603,9 +638,9 @@ public class RFile { SampleLocalityGroupWriter sampleWriter = null; if (sampler != null) { - sampleWriter = new SampleLocalityGroupWriter(new LocalityGroupWriter(fileWriter, blockSize, sampleLocalityGroup, null), sampler); + sampleWriter = new SampleLocalityGroupWriter(new LocalityGroupWriter(fileWriter, blockSize, maxBlockSize, sampleLocalityGroup, null), sampler); } - lgWriter = new LocalityGroupWriter(fileWriter, blockSize, currentLocalityGroup, sampleWriter); + lgWriter = new LocalityGroupWriter(fileWriter, blockSize, maxBlockSize, currentLocalityGroup, sampleWriter); } @Override @@ -838,7 +873,7 @@ public class RFile { reseek = false; } - if (startKey.compareTo(getTopKey()) >= 0 && startKey.compareTo(iiter.peekPrevious().getKey()) <= 0) { + if (entriesLeft > 0 && startKey.compareTo(getTopKey()) >= 0 && startKey.compareTo(iiter.peekPrevious().getKey()) <= 0) { // start key is within the unconsumed portion of the current block // this code intentionally does not use the index associated with a cached block @@ -848,7 +883,7 @@ public class RFile { // and speed up others. MutableByteSequence valbs = new MutableByteSequence(new byte[64], 0, 0); - SkippR skippr = RelativeKey.fastSkip(currBlock, startKey, valbs, prevKey, getTopKey()); + SkippR skippr = RelativeKey.fastSkip(currBlock, startKey, valbs, prevKey, getTopKey(), entriesLeft); if (skippr.skipped > 0) { entriesLeft -= skippr.skipped; val = new Value(valbs.toArray()); @@ -859,6 +894,13 @@ public class RFile { reseek = false; } + if (entriesLeft == 0 && startKey.compareTo(getTopKey()) > 0 && startKey.compareTo(iiter.peekPrevious().getKey()) <= 0) { + // In the empty space at the end of a block. This can occur when keys are shortened in the index creating index entries that do not exist in the + // block. These shortened index entires fall between the last key in a block and first key in the next block, but may not exist in the data. + // Just proceed to the next block. + reseek = false; + } + if (iiter.previousIndex() == 0 && getTopKey().equals(firstKey) && startKey.compareTo(firstKey) <= 0) { // seeking before the beginning of the file, and already positioned at the first key in the file // so there is nothing to do @@ -921,7 +963,7 @@ public class RFile { } } - SkippR skippr = RelativeKey.fastSkip(currBlock, startKey, valbs, prevKey, currKey); + SkippR skippr = RelativeKey.fastSkip(currBlock, startKey, valbs, prevKey, currKey, entriesLeft); prevKey = skippr.prevKey; entriesLeft -= skippr.skipped; val = new Value(valbs.toArray()); http://git-wip-us.apache.org/repos/asf/accumulo/blob/1ac7b4a9/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java index c8b61b6..cc6aaa2 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java @@ -29,10 +29,10 @@ import org.apache.accumulo.core.file.FileOperations; import org.apache.accumulo.core.file.FileSKVIterator; import org.apache.accumulo.core.file.FileSKVWriter; import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile; +import org.apache.accumulo.core.file.streams.RateLimitedOutputStream; import org.apache.accumulo.core.sample.Sampler; import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl; import org.apache.accumulo.core.sample.impl.SamplerFactory; -import org.apache.accumulo.core.file.streams.RateLimitedOutputStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; http://git-wip-us.apache.org/repos/asf/accumulo/blob/1ac7b4a9/core/src/main/java/org/apache/accumulo/core/file/rfile/RelativeKey.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/RelativeKey.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/RelativeKey.java index aeba4e2..98163b1 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RelativeKey.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RelativeKey.java @@ -230,11 +230,7 @@ public class RelativeKey implements Writable { } } - public static SkippR fastSkip(DataInput in, Key seekKey, MutableByteSequence value, Key prevKey, Key currKey) throws IOException { - // this method assumes that fast skip is being called on a compressed block where the last key - // in the compressed block is >= seekKey... therefore this method shouldn't go past the end of the - // compressed block... if it does, there is probably an error in the caller's logic - + public static SkippR fastSkip(DataInput in, Key seekKey, MutableByteSequence value, Key prevKey, Key currKey, int entriesLeft) throws IOException { // this method mostly avoids object allocation and only does compares when the row changes MutableByteSequence row, cf, cq, cv; @@ -307,7 +303,7 @@ public class RelativeKey implements Writable { int count = 0; Key newPrevKey = null; - while (true) { + while (count < entriesLeft) { pdel = (fieldsSame & DELETED) == DELETED; http://git-wip-us.apache.org/repos/asf/accumulo/blob/1ac7b4a9/core/src/test/java/org/apache/accumulo/core/file/rfile/KeyShortenerTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/KeyShortenerTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/KeyShortenerTest.java new file mode 100644 index 0000000..67ff70c --- /dev/null +++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/KeyShortenerTest.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.accumulo.core.file.rfile; + +import org.apache.accumulo.core.data.Key; +import org.junit.Assert; +import org.junit.Test; + +import com.google.common.primitives.Bytes; + +public class KeyShortenerTest { + + private static final byte[] E = new byte[0]; + private static final byte[] FF = new byte[] {(byte) 0xff}; + + private void assertBetween(Key p, Key s, Key c) { + Assert.assertTrue(p.compareTo(s) < 0); + Assert.assertTrue(s.compareTo(c) < 0); + } + + private void testKeys(Key prev, Key current, Key expected) { + Key sk = KeyShortener.shorten(prev, current); + assertBetween(prev, sk, current); + } + + /** + * append 0xff to end of string + */ + private byte[] aff(String s) { + return Bytes.concat(s.getBytes(), FF); + } + + /** + * append 0x00 to end of string + */ + private byte[] a00(String s) { + return Bytes.concat(s.getBytes(), new byte[] {(byte) 0x00}); + } + + private byte[] toBytes(Object o) { + if (o instanceof String) { + return ((String) o).getBytes(); + } else if (o instanceof byte[]) { + return (byte[]) o; + } + + throw new IllegalArgumentException(); + } + + private Key nk(Object row, Object fam, Object qual, long ts) { + return new Key(toBytes(row), toBytes(fam), toBytes(qual), E, ts); + } + + @Test + public void testOneCharacterDifference() { + // row has char that differs by one byte + testKeys(new Key("r321hahahaha", "f89222", "q90232e"), new Key("r321hbhahaha", "f89222", "q90232e"), nk(aff("r321ha"), E, E, 0)); + + // family has char that differs by one byte + testKeys(new Key("r321hahahaha", "f89222", "q90232e"), new Key("r321hahahaha", "f89322", "q90232e"), nk("r321hahahaha", aff("f892"), E, 0)); + + // qualifier has char that differs by one byte + testKeys(new Key("r321hahahaha", "f89222", "q90232e"), new Key("r321hahahaha", "f89222", "q91232e"), nk("r321hahahaha", "f89222", aff("q90"), 0)); + } + + @Test + public void testMultiCharacterDifference() { + // row has char that differs by two bytes + testKeys(new Key("r321hahahaha", "f89222", "q90232e"), new Key("r321hchahaha", "f89222", "q90232e"), nk("r321hb", E, E, 0)); + + // family has char that differs by two bytes + testKeys(new Key("r321hahahaha", "f89222", "q90232e"), new Key("r321hahahaha", "f89422", "q90232e"), nk("r321hahahaha", "f893", E, 0)); + + // qualifier has char that differs by two bytes + testKeys(new Key("r321hahahaha", "f89222", "q90232e"), new Key("r321hahahaha", "f89222", "q92232e"), nk("r321hahahaha", "f89222", "q91", 0)); + } + + @Test + public void testOneCharacterDifferenceAndFF() { + byte[] ff1 = Bytes.concat(aff("mop"), "b".getBytes()); + byte[] ff2 = Bytes.concat(aff("mop"), FF, "b".getBytes()); + + byte[] eff1 = Bytes.concat(aff("mop"), FF, FF); + byte[] eff2 = Bytes.concat(aff("mop"), FF, FF, FF); + + testKeys(nk(ff1, "f89222", "q90232e", 34), new Key("mor56", "f89222", "q90232e"), nk(eff1, E, E, 0)); + testKeys(nk("r1", ff1, "q90232e", 34), new Key("r1", "mor56", "q90232e"), nk("r1", eff1, E, 0)); + testKeys(nk("r1", "f1", ff1, 34), new Key("r1", "f1", "mor56"), nk("r1", "f1", eff1, 0)); + + testKeys(nk(ff2, "f89222", "q90232e", 34), new Key("mor56", "f89222", "q90232e"), nk(eff2, E, E, 0)); + testKeys(nk("r1", ff2, "q90232e", 34), new Key("r1", "mor56", "q90232e"), nk("r1", eff2, E, 0)); + testKeys(nk("r1", "f1", ff2, 34), new Key("r1", "f1", "mor56"), nk("r1", "f1", eff2, 0)); + + } + + @Test + public void testOneCharacterDifferenceAtEnd() { + testKeys(new Key("r321hahahaha", "f89222", "q90232e"), new Key("r321hahahahb", "f89222", "q90232e"), nk(a00("r321hahahaha"), E, E, 0)); + testKeys(new Key("r321hahahaha", "f89222", "q90232e"), new Key("r321hahahaha", "f89223", "q90232e"), nk("r321hahahaha", a00("f89222"), E, 0)); + testKeys(new Key("r321hahahaha", "f89222", "q90232e"), new Key("r321hahahaha", "f89222", "q90232f"), nk("r321hahahaha", "f89222", a00("q90232e"), 0)); + } + + @Test + public void testSamePrefix() { + testKeys(new Key("r3boot4", "f89222", "q90232e"), new Key("r3boot452", "f89222", "q90232e"), nk(a00("r3boot4"), E, E, 0)); + testKeys(new Key("r3boot4", "f892", "q90232e"), new Key("r3boot4", "f89222", "q90232e"), nk("r3boot4", a00("f892"), E, 0)); + testKeys(new Key("r3boot4", "f89222", "q902"), new Key("r3boot4", "f89222", "q90232e"), nk("r3boot4", "f89222", a00("q902"), 0)); + } + + @Test + public void testSamePrefixAnd00() { + Key prev = new Key("r3boot4", "f89222", "q90232e"); + Assert.assertEquals(prev, KeyShortener.shorten(prev, nk(a00("r3boot4"), "f89222", "q90232e", 8))); + prev = new Key("r3boot4", "f892", "q90232e"); + Assert.assertEquals(prev, KeyShortener.shorten(prev, nk("r3boot4", a00("f892"), "q90232e", 8))); + prev = new Key("r3boot4", "f89222", "q902"); + Assert.assertEquals(prev, KeyShortener.shorten(prev, nk("r3boot4", "f89222", a00("q902"), 8))); + } + + @Test + public void testSanityCheck1() { + // prev and shortened equal + Key prev = new Key("r001", "f002", "q006"); + Assert.assertEquals(prev, KeyShortener.sanityCheck(prev, new Key("r002", "f002", "q006"), new Key("r001", "f002", "q006"))); + // prev > shortened equal + Assert.assertEquals(prev, KeyShortener.sanityCheck(prev, new Key("r003", "f002", "q006"), new Key("r001", "f002", "q006"))); + // current and shortened equal + Assert.assertEquals(prev, KeyShortener.sanityCheck(prev, new Key("r003", "f002", "q006"), new Key("r003", "f002", "q006"))); + // shortened > current + Assert.assertEquals(prev, KeyShortener.sanityCheck(prev, new Key("r003", "f002", "q006"), new Key("r004", "f002", "q006"))); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/1ac7b4a9/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileMetricsTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileMetricsTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileMetricsTest.java index 7f8c087..92a1d32 100644 --- a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileMetricsTest.java +++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileMetricsTest.java @@ -459,7 +459,7 @@ public class RFileMetricsTest { public void multiBlockMultiCFNonDefaultAndDefaultLocGroup() throws IOException { // test an rfile with multiple column families and multiple blocks in a non-default locality group and the default locality group - trf.openWriter(false, 20);// Each entry is a block + trf.openWriter(false, 10);// Each entry is a block Set<ByteSequence> lg1 = new HashSet<>(); lg1.add(new ArrayByteSequence("cf1")); lg1.add(new ArrayByteSequence("cf3")); http://git-wip-us.apache.org/repos/asf/accumulo/blob/1ac7b4a9/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java index 5f66503..d97a4db 100644 --- a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java +++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java @@ -30,6 +30,7 @@ import java.io.IOException; import java.io.InputStream; import java.util.AbstractMap; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Comparator; @@ -57,6 +58,7 @@ import org.apache.accumulo.core.file.FileSKVIterator; import org.apache.accumulo.core.file.blockfile.cache.LruBlockCache; import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile; import org.apache.accumulo.core.file.rfile.RFile.Reader; +import org.apache.accumulo.core.file.streams.PositionedOutputs; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; import org.apache.accumulo.core.iterators.system.ColumnFamilySkippingIterator; import org.apache.accumulo.core.metadata.MetadataTable; @@ -86,7 +88,6 @@ import com.google.common.hash.HashCode; import com.google.common.hash.Hasher; import com.google.common.hash.Hashing; import com.google.common.primitives.Bytes; -import org.apache.accumulo.core.file.streams.PositionedOutputs; public class RFileTest { @@ -241,7 +242,7 @@ public class RFileTest { } public void openWriter() throws IOException { - openWriter(true, 1000); + openWriter(1000); } public void openWriter(int blockSize) throws IOException { @@ -385,6 +386,8 @@ public class RFileTest { String cvS = "" + (char) cv; for (int ts = 4; ts > 0; ts--) { Key k = nk(rowS, cfS, cqS, cvS, ts); + // check below ensures when all key sizes are same more than one index block is created + Assert.assertEquals(27, k.getSize()); k.setDeleted(true); Value v = nv("" + val); trf.writer.append(k, v); @@ -392,6 +395,7 @@ public class RFileTest { expectedValues.add(v); k = nk(rowS, cfS, cqS, cvS, ts); + Assert.assertEquals(27, k.getSize()); v = nv("" + val); trf.writer.append(k, v); expectedKeys.add(k); @@ -500,6 +504,15 @@ public class RFileTest { } } + // count the number of index entries + FileSKVIterator iiter = trf.reader.getIndex(); + int count = 0; + while (iiter.hasTop()) { + count++; + iiter.next(); + } + Assert.assertEquals(20, count); + trf.closeReader(); } @@ -2091,6 +2104,56 @@ public class RFileTest { } @Test + public void testBigKeys() throws IOException { + // this test ensures that big keys do not end up index + ArrayList<Key> keys = new ArrayList<>(); + + for (int i = 0; i < 1000; i++) { + String row = String.format("r%06d", i); + keys.add(new Key(row, "cf1", "cq1", 42)); + } + + // add a few keys with long rows + for (int i = 0; i < 1000; i += 100) { + String row = String.format("r%06d", i); + char ca[] = new char[1000]; + Arrays.fill(ca, 'b'); + row = row + new String(ca); + keys.add(new Key(row, "cf1", "cq1", 42)); + } + + Collections.sort(keys); + + TestRFile trf = new TestRFile(conf); + + trf.openWriter(); + + for (Key k : keys) { + trf.writer.append(k, new Value((k.hashCode() + "").getBytes())); + } + + trf.writer.close(); + + trf.openReader(); + + FileSKVIterator iiter = trf.reader.getIndex(); + while (iiter.hasTop()) { + Key k = iiter.getTopKey(); + Assert.assertTrue(k + " " + k.getSize() + " >= 20", k.getSize() < 20); + iiter.next(); + } + + Collections.shuffle(keys); + + for (Key key : keys) { + trf.reader.seek(new Range(key, null), EMPTY_COL_FAMS, false); + Assert.assertTrue(trf.reader.hasTop()); + Assert.assertEquals(key, trf.reader.getTopKey()); + Assert.assertEquals(new Value((key.hashCode() + "").getBytes()), trf.reader.getTopValue()); + } + } + + @Test public void testCryptoDoesntLeakSensitive() throws IOException { conf = setAndGetAccumuloConfig(CryptoTest.CRYPTO_ON_CONF); // test an empty file http://git-wip-us.apache.org/repos/asf/accumulo/blob/1ac7b4a9/core/src/test/java/org/apache/accumulo/core/file/rfile/RelativeKeyTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/RelativeKeyTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/RelativeKeyTest.java index e413448..4ca4b6c 100644 --- a/core/src/test/java/org/apache/accumulo/core/file/rfile/RelativeKeyTest.java +++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/RelativeKeyTest.java @@ -30,6 +30,7 @@ import org.apache.accumulo.core.data.ArrayByteSequence; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.PartialKey; import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.file.rfile.RelativeKey.SkippR; import org.apache.accumulo.core.util.MutableByteSequence; import org.apache.accumulo.core.util.UnsynchronizedBuffer; import org.junit.Before; @@ -178,7 +179,7 @@ public class RelativeKeyTest { Key currKey = null; MutableByteSequence value = new MutableByteSequence(new byte[64], 0, 0); - RelativeKey.SkippR skippr = RelativeKey.fastSkip(in, seekKey, value, prevKey, currKey); + RelativeKey.SkippR skippr = RelativeKey.fastSkip(in, seekKey, value, prevKey, currKey, expectedKeys.size()); assertEquals(1, skippr.skipped); assertEquals(new Key(), skippr.prevKey); assertEquals(expectedKeys.get(0), skippr.rk.getKey()); @@ -192,7 +193,7 @@ public class RelativeKeyTest { seekKey = new Key("a", "b", "c", "d", 1); seekKey.setDeleted(true); - skippr = RelativeKey.fastSkip(in, seekKey, value, prevKey, currKey); + skippr = RelativeKey.fastSkip(in, seekKey, value, prevKey, currKey, expectedKeys.size()); assertEquals(1, skippr.skipped); assertEquals(new Key(), skippr.prevKey); assertEquals(expectedKeys.get(0), skippr.rk.getKey()); @@ -203,13 +204,23 @@ public class RelativeKeyTest { } @Test(expected = EOFException.class) + public void testSeekAfterEverythingWrongCount() throws IOException { + Key seekKey = new Key("s", "t", "u", "v", 1); + Key prevKey = new Key(); + Key currKey = null; + MutableByteSequence value = new MutableByteSequence(new byte[64], 0, 0); + + RelativeKey.fastSkip(in, seekKey, value, prevKey, currKey, expectedKeys.size() + 1); + } + public void testSeekAfterEverything() throws IOException { Key seekKey = new Key("s", "t", "u", "v", 1); Key prevKey = new Key(); Key currKey = null; MutableByteSequence value = new MutableByteSequence(new byte[64], 0, 0); - RelativeKey.fastSkip(in, seekKey, value, prevKey, currKey); + SkippR skippr = RelativeKey.fastSkip(in, seekKey, value, prevKey, currKey, expectedKeys.size()); + assertEquals(expectedKeys.size(), skippr.skipped); } @Test @@ -220,7 +231,7 @@ public class RelativeKeyTest { Key currKey = null; MutableByteSequence value = new MutableByteSequence(new byte[64], 0, 0); - RelativeKey.SkippR skippr = RelativeKey.fastSkip(in, seekKey, value, prevKey, currKey); + RelativeKey.SkippR skippr = RelativeKey.fastSkip(in, seekKey, value, prevKey, currKey, expectedKeys.size()); assertEquals(seekIndex + 1, skippr.skipped); assertEquals(expectedKeys.get(seekIndex - 1), skippr.prevKey); @@ -236,14 +247,17 @@ public class RelativeKeyTest { int i; for (i = seekIndex; expectedKeys.get(i).compareTo(fKey) < 0; i++) {} - skippr = RelativeKey.fastSkip(in, expectedKeys.get(i), value, prevKey, currKey); + int left = expectedKeys.size(); + + skippr = RelativeKey.fastSkip(in, expectedKeys.get(i), value, prevKey, currKey, expectedKeys.size()); assertEquals(i + 1, skippr.skipped); + left -= skippr.skipped; assertEquals(expectedKeys.get(i - 1), skippr.prevKey); assertEquals(expectedKeys.get(i), skippr.rk.getKey()); assertEquals(expectedValues.get(i).toString(), value.toString()); // try fast skipping to our current location - skippr = RelativeKey.fastSkip(in, expectedKeys.get(i), value, expectedKeys.get(i - 1), expectedKeys.get(i)); + skippr = RelativeKey.fastSkip(in, expectedKeys.get(i), value, expectedKeys.get(i - 1), expectedKeys.get(i), left); assertEquals(0, skippr.skipped); assertEquals(expectedKeys.get(i - 1), skippr.prevKey); assertEquals(expectedKeys.get(i), skippr.rk.getKey()); @@ -253,7 +267,7 @@ public class RelativeKeyTest { fKey = expectedKeys.get(i).followingKey(PartialKey.ROW_COLFAM); int j; for (j = i; expectedKeys.get(j).compareTo(fKey) < 0; j++) {} - skippr = RelativeKey.fastSkip(in, fKey, value, expectedKeys.get(i - 1), expectedKeys.get(i)); + skippr = RelativeKey.fastSkip(in, fKey, value, expectedKeys.get(i - 1), expectedKeys.get(i), left); assertEquals(j - i, skippr.skipped); assertEquals(expectedKeys.get(j - 1), skippr.prevKey); assertEquals(expectedKeys.get(j), skippr.rk.getKey());