Repository: accumulo
Updated Branches:
  refs/heads/master a15ff128b -> f3b9d1925


ACCUMULO-1124 Shorten keys in index and uses statistics to choose better keys


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/1ac7b4a9
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/1ac7b4a9
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/1ac7b4a9

Branch: refs/heads/master
Commit: 1ac7b4a9ffe0ebd5e56c4bbb672eea1ba79dd688
Parents: fd37134
Author: Keith Turner <ktur...@apache.org>
Authored: Fri May 27 11:54:43 2016 -0400
Committer: Keith Turner <ktur...@apache.org>
Committed: Fri May 27 11:54:43 2016 -0400

----------------------------------------------------------------------
 .../accumulo/core/file/rfile/KeyShortener.java  | 138 +++++++++++++++++
 .../accumulo/core/file/rfile/PrintInfo.java     |  86 +++++++++--
 .../apache/accumulo/core/file/rfile/RFile.java  |  64 ++++++--
 .../core/file/rfile/RFileOperations.java        |   2 +-
 .../accumulo/core/file/rfile/RelativeKey.java   |   8 +-
 .../core/file/rfile/KeyShortenerTest.java       | 147 +++++++++++++++++++
 .../core/file/rfile/RFileMetricsTest.java       |   2 +-
 .../accumulo/core/file/rfile/RFileTest.java     |  67 ++++++++-
 .../core/file/rfile/RelativeKeyTest.java        |  28 +++-
 9 files changed, 499 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/1ac7b4a9/core/src/main/java/org/apache/accumulo/core/file/rfile/KeyShortener.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/core/file/rfile/KeyShortener.java 
b/core/src/main/java/org/apache/accumulo/core/file/rfile/KeyShortener.java
new file mode 100644
index 0000000..b039982
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/KeyShortener.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.file.rfile;
+
+import org.apache.accumulo.core.data.ArrayByteSequence;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Bytes;
+
+/*
+ * Code to shorten keys that will be placed into RFile indexes. This code 
attempts to find a key thats between two keys that shorter.
+ */
+public class KeyShortener {
+
+  private static final byte[] EMPTY = new byte[0];
+  private static final byte[] B00 = new byte[] {(byte) 0x00};
+  private static final byte[] BFF = new byte[] {(byte) 0xff};
+
+  private static final Logger log = 
LoggerFactory.getLogger(KeyShortener.class);
+
+  private KeyShortener() {}
+
+  private static int findNonFF(ByteSequence bs, int start) {
+    for (int i = start; i < bs.length(); i++) {
+      if (bs.byteAt(i) != (byte) 0xff) {
+        return i;
+      }
+    }
+
+    return bs.length();
+  }
+
+  /*
+   * return S such that prev < S < current or null if no such sequence
+   */
+  public static ByteSequence shorten(ByteSequence prev, ByteSequence current) {
+
+    int minLen = Math.min(prev.length(), current.length());
+
+    for (int i = 0; i < minLen; i++) {
+      int pb = 0xff & prev.byteAt(i);
+      int cb = 0xff & current.byteAt(i);
+
+      int diff = cb - pb;
+
+      if (diff == 1) {
+        int newLen = findNonFF(prev, i + 1);
+        byte[] successor;
+        if (newLen < prev.length()) {
+          successor = Bytes.concat(prev.subSequence(0, newLen).toArray(), BFF);
+        } else {
+          successor = Bytes.concat(prev.subSequence(0, newLen).toArray(), B00);
+        }
+        return new ArrayByteSequence(successor);
+      } else if (diff > 1) {
+        byte[] copy = new byte[i + 1];
+        System.arraycopy(prev.subSequence(0, i + 1).toArray(), 0, copy, 0, i + 
1);
+        copy[i] = (byte) ((0xff & copy[i]) + 1);
+        return new ArrayByteSequence(copy);
+      }
+    }
+
+    ArrayByteSequence successor = new 
ArrayByteSequence(Bytes.concat(prev.toArray(), B00));
+    if (successor.equals(current)) {
+      return null;
+    }
+
+    return successor;
+  }
+
+  /*
+   * This entire class supports an optional optimization. This code does a 
sanity check to ensure the optimization code did what was intended, doing a 
noop if
+   * there is a bug.
+   */
+  @VisibleForTesting
+  static Key sanityCheck(Key prev, Key current, Key shortened) {
+    if (prev.compareTo(shortened) >= 0) {
+      log.warn("Bug in key shortening code, please open an issue " + prev + " 
>= " + shortened);
+      return prev;
+    }
+
+    if (current.compareTo(shortened) <= 0) {
+      log.warn("Bug in key shortening code, please open an issue " + current + 
" <= " + shortened);
+      return prev;
+    }
+
+    return shortened;
+  }
+
+  /*
+   * Find a key K where prev < K < current AND K is shorter. If can not find a 
K that meets criteria, then returns prev.
+   */
+  public static Key shorten(Key prev, Key current) {
+    Preconditions.checkArgument(prev.compareTo(current) <= 0, "Expected key 
less than or equal. " + prev + " > " + current);
+
+    if (prev.getRowData().compareTo(current.getRowData()) < 0) {
+      ByteSequence shortenedRow = shorten(prev.getRowData(), 
current.getRowData());
+      if (shortenedRow == null) {
+        return prev;
+      }
+      return sanityCheck(prev, current, new Key(shortenedRow.toArray(), EMPTY, 
EMPTY, EMPTY, 0));
+    } else if 
(prev.getColumnFamilyData().compareTo(current.getColumnFamilyData()) < 0) {
+      ByteSequence shortenedFam = shorten(prev.getColumnFamilyData(), 
current.getColumnFamilyData());
+      if (shortenedFam == null) {
+        return prev;
+      }
+      return sanityCheck(prev, current, new Key(prev.getRowData().toArray(), 
shortenedFam.toArray(), EMPTY, EMPTY, 0));
+    } else if 
(prev.getColumnQualifierData().compareTo(current.getColumnQualifierData()) < 0) 
{
+      ByteSequence shortenedQual = shorten(prev.getColumnQualifierData(), 
current.getColumnQualifierData());
+      if (shortenedQual == null) {
+        return prev;
+      }
+      return sanityCheck(prev, current, new Key(prev.getRowData().toArray(), 
prev.getColumnFamilyData().toArray(), shortenedQual.toArray(), EMPTY, 0));
+    } else {
+      return prev;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/1ac7b4a9/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java 
b/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java
index cfe571c..e166557 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java
@@ -32,6 +32,7 @@ import org.apache.accumulo.core.file.FileSKVIterator;
 import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile;
 import org.apache.accumulo.core.file.rfile.RFile.Reader;
 import org.apache.accumulo.start.spi.KeywordExecutable;
+import org.apache.commons.math.stat.descriptive.SummaryStatistics;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -57,12 +58,52 @@ public class PrintInfo implements KeywordExecutable {
     boolean histogram = false;
     @Parameter(names = {"--useSample"}, description = "Use sample data for 
--dump, --vis, --histogram options")
     boolean useSample = false;
+    @Parameter(names = {"--keyStats"}, description = "print key length 
statistics for index and all data")
+    boolean keyStats = false;
     @Parameter(description = " <file> { <file> ... }")
     List<String> files = new ArrayList<String>();
     @Parameter(names = {"-c", "--config"}, variableArity = true, description = 
"Comma-separated Hadoop configuration files")
     List<String> configFiles = new ArrayList<>();
   }
 
+  static class LogHistogram {
+    long countBuckets[] = new long[11];
+    long sizeBuckets[] = new long[countBuckets.length];
+    long totalSize = 0;
+
+    public void add(int size) {
+      int bucket = (int) Math.log10(size);
+      countBuckets[bucket]++;
+      sizeBuckets[bucket] += size;
+      totalSize += size;
+    }
+
+    public void print(String indent) {
+      System.out.println(indent + "Up to size      count      %-age");
+      for (int i = 1; i < countBuckets.length; i++) {
+        System.out.println(String.format("%s%11.0f : %10d %6.2f%%", indent, 
Math.pow(10, i), countBuckets[i], sizeBuckets[i] * 100. / totalSize));
+      }
+    }
+  }
+
+  static class KeyStats {
+    private SummaryStatistics stats = new SummaryStatistics();
+    private LogHistogram logHistogram = new LogHistogram();
+
+    public void add(Key k) {
+      int size = k.getSize();
+      stats.addValue(size);
+      logHistogram.add(size);
+    }
+
+    public void print(String indent) {
+      logHistogram.print(indent);
+      System.out.println();
+      System.out.printf("%smin:%,11.2f max:%,11.2f avg:%,11.2f 
stddev:%,11.2f\n", indent, stats.getMin(), stats.getMax(), stats.getMean(),
+          stats.getStandardDeviation());
+    }
+  }
+
   public static void main(String[] args) throws Exception {
     new PrintInfo().execute(args);
   }
@@ -90,9 +131,10 @@ public class PrintInfo implements KeywordExecutable {
     FileSystem hadoopFs = FileSystem.get(conf);
     FileSystem localFs = FileSystem.getLocal(conf);
 
-    long countBuckets[] = new long[11];
-    long sizeBuckets[] = new long[countBuckets.length];
-    long totalSize = 0;
+    LogHistogram kvHistogram = new LogHistogram();
+
+    KeyStats dataKeyStats = new KeyStats();
+    KeyStats indexKeyStats = new KeyStats();
 
     for (String arg : opts.files) {
       Path path = new Path(arg);
@@ -119,7 +161,7 @@ public class PrintInfo implements KeywordExecutable {
 
       Map<String,ArrayList<ByteSequence>> localityGroupCF = null;
 
-      if (opts.histogram || opts.dump || opts.vis || opts.hash) {
+      if (opts.histogram || opts.dump || opts.vis || opts.hash || 
opts.keyStats) {
         localityGroupCF = iter.getLocalityGroupCF();
 
         FileSKVIterator dataIter;
@@ -134,6 +176,14 @@ public class PrintInfo implements KeywordExecutable {
           dataIter = iter;
         }
 
+        if (opts.keyStats) {
+          FileSKVIterator indexIter = iter.getIndex();
+          while (indexIter.hasTop()) {
+            indexKeyStats.add(indexIter.getTopKey());
+            indexIter.next();
+          }
+        }
+
         for (Entry<String,ArrayList<ByteSequence>> cf : 
localityGroupCF.entrySet()) {
 
           dataIter.seek(new Range((Key) null, (Key) null), cf.getValue(), 
true);
@@ -146,30 +196,36 @@ public class PrintInfo implements KeywordExecutable {
                 return;
             }
             if (opts.histogram) {
-              long size = key.getSize() + value.getSize();
-              int bucket = (int) Math.log10(size);
-              countBuckets[bucket]++;
-              sizeBuckets[bucket] += size;
-              totalSize += size;
+              kvHistogram.add(key.getSize() + value.getSize());
+            }
+            if (opts.keyStats) {
+              dataKeyStats.add(key);
             }
             dataIter.next();
           }
         }
       }
-      System.out.println();
 
       iter.close();
 
-      if (opts.vis || opts.hash)
+      if (opts.vis || opts.hash) {
+        System.out.println();
         vmg.printMetrics(opts.hash, "Visibility", System.out);
+      }
 
       if (opts.histogram) {
-        System.out.println("Up to size      count      %-age");
-        for (int i = 1; i < countBuckets.length; i++) {
-          System.out.println(String.format("%11.0f : %10d %6.2f%%", 
Math.pow(10, i), countBuckets[i], sizeBuckets[i] * 100. / totalSize));
-        }
+        System.out.println();
+        kvHistogram.print("");
       }
 
+      if (opts.keyStats) {
+        System.out.println();
+        System.out.println("Statistics for keys in data :");
+        dataKeyStats.print("\t");
+        System.out.println();
+        System.out.println("Statistics for keys in index :");
+        indexKeyStats.print("\t");
+      }
       // If the output stream has closed, there is no reason to keep going.
       if (System.out.checkError())
         return;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/1ac7b4a9/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java 
b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
index a73936b..6032c7f 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
@@ -70,6 +70,7 @@ import org.apache.accumulo.core.sample.Sampler;
 import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
 import org.apache.accumulo.core.util.MutableByteSequence;
 import org.apache.commons.lang.mutable.MutableLong;
+import org.apache.commons.math.stat.descriptive.SummaryStatistics;
 import org.apache.hadoop.io.Writable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -89,6 +90,10 @@ public class RFile {
   static final int RINDEX_VER_8 = 8; // Added sample storage. There is a 
sample locality group for each locality group. Sample are built using a Sampler 
and
                                      // sampler configuration. The Sampler and 
its configuration are stored in RFile. Persisting the method of producing the
                                      // sample allows a user of RFile to 
determine if the sample is useful.
+                                     //
+                                     // Selected smaller keys for index by 
doing two things. First internal stats were used to look for keys that were 
below
+                                     // average in size for the index. Also 
keys that were statistically large were excluded from the index. Second shorter 
keys
+                                     // (that may not exist in data) were 
generated for the index.
   static final int RINDEX_VER_7 = 7; // Added support for prefix encoding and 
encryption. Before this change only exact matches within a key field were 
deduped
                                      // for consecutive keys. After this 
change, if consecutive key fields have the same prefix then the prefix is only 
stored
                                      // once.
@@ -375,7 +380,8 @@ public class RFile {
     private ABlockWriter blockWriter;
 
     // private BlockAppender blockAppender;
-    private long blockSize = 100000;
+    private final long blockSize;
+    private final long maxBlockSize;
     private int entries = 0;
 
     private LocalityGroupMetadata currentLocalityGroup = null;
@@ -386,13 +392,23 @@ public class RFile {
 
     private SampleLocalityGroupWriter sample;
 
-    LocalityGroupWriter(BlockFileWriter fileWriter, long blockSize, 
LocalityGroupMetadata currentLocalityGroup, SampleLocalityGroupWriter sample) {
+    private SummaryStatistics keyLenStats = new SummaryStatistics();
+    private double avergageKeySize = 0;
+
+    LocalityGroupWriter(BlockFileWriter fileWriter, long blockSize, long 
maxBlockSize, LocalityGroupMetadata currentLocalityGroup,
+        SampleLocalityGroupWriter sample) {
       this.fileWriter = fileWriter;
       this.blockSize = blockSize;
+      this.maxBlockSize = maxBlockSize;
       this.currentLocalityGroup = currentLocalityGroup;
       this.sample = sample;
     }
 
+    private boolean isGiantKey(Key k) {
+      // consider a key thats more than 3 standard deviations from previously 
seen key sizes as giant
+      return k.getSize() > keyLenStats.getMean() + 
keyLenStats.getStandardDeviation() * 3;
+    }
+
     public void append(Key key, Value value) throws IOException {
 
       if (key.compareTo(prevKey) < 0) {
@@ -412,8 +428,22 @@ public class RFile {
       if (blockWriter == null) {
         blockWriter = fileWriter.prepareDataBlock();
       } else if (blockWriter.getRawSize() > blockSize) {
-        closeBlock(prevKey, false);
-        blockWriter = fileWriter.prepareDataBlock();
+
+        // Look for a key thats short to put in the index, defining short as 
average or below.
+        if (avergageKeySize == 0) {
+          // use the same average for the search for a below average key for a 
block
+          avergageKeySize = keyLenStats.getMean();
+        }
+
+        // Possibly produce a shorter key that does not exist in data. Even if 
a key can be shortened, it may not be below average.
+        Key closeKey = KeyShortener.shorten(prevKey, key);
+
+        if ((closeKey.getSize() <= avergageKeySize || blockWriter.getRawSize() 
> maxBlockSize) && !isGiantKey(closeKey)) {
+          closeBlock(closeKey, false);
+          blockWriter = fileWriter.prepareDataBlock();
+          // set average to zero so its recomputed for the next block
+          avergageKeySize = 0;
+        }
       }
 
       RelativeKey rk = new RelativeKey(lastKeyInBlock, key);
@@ -422,6 +452,8 @@ public class RFile {
       value.write(blockWriter);
       entries++;
 
+      keyLenStats.addValue(key.getSize());
+
       prevKey = new Key(key);
       lastKeyInBlock = prevKey;
 
@@ -457,12 +489,14 @@ public class RFile {
   public static class Writer implements FileSKVWriter {
 
     public static final int MAX_CF_IN_DLG = 1000;
+    private static final double MAX_BLOCK_MULTIPLIER = 1.1;
 
     private BlockFileWriter fileWriter;
 
     // private BlockAppender blockAppender;
-    private long blockSize = 100000;
-    private int indexBlockSize;
+    private final long blockSize;
+    private final long maxBlockSize;
+    private final int indexBlockSize;
 
     private ArrayList<LocalityGroupMetadata> localityGroups = new 
ArrayList<LocalityGroupMetadata>();
     private ArrayList<LocalityGroupMetadata> sampleGroups = new 
ArrayList<LocalityGroupMetadata>();
@@ -487,6 +521,7 @@ public class RFile {
 
     public Writer(BlockFileWriter bfw, int blockSize, int indexBlockSize, 
SamplerConfigurationImpl samplerConfig, Sampler sampler) throws IOException {
       this.blockSize = blockSize;
+      this.maxBlockSize = (long) (blockSize * MAX_BLOCK_MULTIPLIER);
       this.indexBlockSize = indexBlockSize;
       this.fileWriter = bfw;
       previousColumnFamilies = new HashSet<ByteSequence>();
@@ -603,9 +638,9 @@ public class RFile {
 
       SampleLocalityGroupWriter sampleWriter = null;
       if (sampler != null) {
-        sampleWriter = new SampleLocalityGroupWriter(new 
LocalityGroupWriter(fileWriter, blockSize, sampleLocalityGroup, null), sampler);
+        sampleWriter = new SampleLocalityGroupWriter(new 
LocalityGroupWriter(fileWriter, blockSize, maxBlockSize, sampleLocalityGroup, 
null), sampler);
       }
-      lgWriter = new LocalityGroupWriter(fileWriter, blockSize, 
currentLocalityGroup, sampleWriter);
+      lgWriter = new LocalityGroupWriter(fileWriter, blockSize, maxBlockSize, 
currentLocalityGroup, sampleWriter);
     }
 
     @Override
@@ -838,7 +873,7 @@ public class RFile {
           reseek = false;
         }
 
-        if (startKey.compareTo(getTopKey()) >= 0 && 
startKey.compareTo(iiter.peekPrevious().getKey()) <= 0) {
+        if (entriesLeft > 0 && startKey.compareTo(getTopKey()) >= 0 && 
startKey.compareTo(iiter.peekPrevious().getKey()) <= 0) {
           // start key is within the unconsumed portion of the current block
 
           // this code intentionally does not use the index associated with a 
cached block
@@ -848,7 +883,7 @@ public class RFile {
           // and speed up others.
 
           MutableByteSequence valbs = new MutableByteSequence(new byte[64], 0, 
0);
-          SkippR skippr = RelativeKey.fastSkip(currBlock, startKey, valbs, 
prevKey, getTopKey());
+          SkippR skippr = RelativeKey.fastSkip(currBlock, startKey, valbs, 
prevKey, getTopKey(), entriesLeft);
           if (skippr.skipped > 0) {
             entriesLeft -= skippr.skipped;
             val = new Value(valbs.toArray());
@@ -859,6 +894,13 @@ public class RFile {
           reseek = false;
         }
 
+        if (entriesLeft == 0 && startKey.compareTo(getTopKey()) > 0 && 
startKey.compareTo(iiter.peekPrevious().getKey()) <= 0) {
+          // In the empty space at the end of a block. This can occur when 
keys are shortened in the index creating index entries that do not exist in the
+          // block. These shortened index entires fall between the last key in 
a block and first key in the next block, but may not exist in the data.
+          // Just proceed to the next block.
+          reseek = false;
+        }
+
         if (iiter.previousIndex() == 0 && getTopKey().equals(firstKey) && 
startKey.compareTo(firstKey) <= 0) {
           // seeking before the beginning of the file, and already positioned 
at the first key in the file
           // so there is nothing to do
@@ -921,7 +963,7 @@ public class RFile {
             }
           }
 
-          SkippR skippr = RelativeKey.fastSkip(currBlock, startKey, valbs, 
prevKey, currKey);
+          SkippR skippr = RelativeKey.fastSkip(currBlock, startKey, valbs, 
prevKey, currKey, entriesLeft);
           prevKey = skippr.prevKey;
           entriesLeft -= skippr.skipped;
           val = new Value(valbs.toArray());

http://git-wip-us.apache.org/repos/asf/accumulo/blob/1ac7b4a9/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java 
b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
index c8b61b6..cc6aaa2 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
@@ -29,10 +29,10 @@ import org.apache.accumulo.core.file.FileOperations;
 import org.apache.accumulo.core.file.FileSKVIterator;
 import org.apache.accumulo.core.file.FileSKVWriter;
 import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile;
+import org.apache.accumulo.core.file.streams.RateLimitedOutputStream;
 import org.apache.accumulo.core.sample.Sampler;
 import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
 import org.apache.accumulo.core.sample.impl.SamplerFactory;
-import org.apache.accumulo.core.file.streams.RateLimitedOutputStream;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/1ac7b4a9/core/src/main/java/org/apache/accumulo/core/file/rfile/RelativeKey.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/core/file/rfile/RelativeKey.java 
b/core/src/main/java/org/apache/accumulo/core/file/rfile/RelativeKey.java
index aeba4e2..98163b1 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RelativeKey.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RelativeKey.java
@@ -230,11 +230,7 @@ public class RelativeKey implements Writable {
     }
   }
 
-  public static SkippR fastSkip(DataInput in, Key seekKey, MutableByteSequence 
value, Key prevKey, Key currKey) throws IOException {
-    // this method assumes that fast skip is being called on a compressed 
block where the last key
-    // in the compressed block is >= seekKey... therefore this method 
shouldn't go past the end of the
-    // compressed block... if it does, there is probably an error in the 
caller's logic
-
+  public static SkippR fastSkip(DataInput in, Key seekKey, MutableByteSequence 
value, Key prevKey, Key currKey, int entriesLeft) throws IOException {
     // this method mostly avoids object allocation and only does compares when 
the row changes
 
     MutableByteSequence row, cf, cq, cv;
@@ -307,7 +303,7 @@ public class RelativeKey implements Writable {
     int count = 0;
     Key newPrevKey = null;
 
-    while (true) {
+    while (count < entriesLeft) {
 
       pdel = (fieldsSame & DELETED) == DELETED;
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/1ac7b4a9/core/src/test/java/org/apache/accumulo/core/file/rfile/KeyShortenerTest.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/accumulo/core/file/rfile/KeyShortenerTest.java 
b/core/src/test/java/org/apache/accumulo/core/file/rfile/KeyShortenerTest.java
new file mode 100644
index 0000000..67ff70c
--- /dev/null
+++ 
b/core/src/test/java/org/apache/accumulo/core/file/rfile/KeyShortenerTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.file.rfile;
+
+import org.apache.accumulo.core.data.Key;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.primitives.Bytes;
+
+public class KeyShortenerTest {
+
+  private static final byte[] E = new byte[0];
+  private static final byte[] FF = new byte[] {(byte) 0xff};
+
+  private void assertBetween(Key p, Key s, Key c) {
+    Assert.assertTrue(p.compareTo(s) < 0);
+    Assert.assertTrue(s.compareTo(c) < 0);
+  }
+
+  private void testKeys(Key prev, Key current, Key expected) {
+    Key sk = KeyShortener.shorten(prev, current);
+    assertBetween(prev, sk, current);
+  }
+
+  /**
+   * append 0xff to end of string
+   */
+  private byte[] aff(String s) {
+    return Bytes.concat(s.getBytes(), FF);
+  }
+
+  /**
+   * append 0x00 to end of string
+   */
+  private byte[] a00(String s) {
+    return Bytes.concat(s.getBytes(), new byte[] {(byte) 0x00});
+  }
+
+  private byte[] toBytes(Object o) {
+    if (o instanceof String) {
+      return ((String) o).getBytes();
+    } else if (o instanceof byte[]) {
+      return (byte[]) o;
+    }
+
+    throw new IllegalArgumentException();
+  }
+
+  private Key nk(Object row, Object fam, Object qual, long ts) {
+    return new Key(toBytes(row), toBytes(fam), toBytes(qual), E, ts);
+  }
+
+  @Test
+  public void testOneCharacterDifference() {
+    // row has char that differs by one byte
+    testKeys(new Key("r321hahahaha", "f89222", "q90232e"), new 
Key("r321hbhahaha", "f89222", "q90232e"), nk(aff("r321ha"), E, E, 0));
+
+    // family has char that differs by one byte
+    testKeys(new Key("r321hahahaha", "f89222", "q90232e"), new 
Key("r321hahahaha", "f89322", "q90232e"), nk("r321hahahaha", aff("f892"), E, 
0));
+
+    // qualifier has char that differs by one byte
+    testKeys(new Key("r321hahahaha", "f89222", "q90232e"), new 
Key("r321hahahaha", "f89222", "q91232e"), nk("r321hahahaha", "f89222", 
aff("q90"), 0));
+  }
+
+  @Test
+  public void testMultiCharacterDifference() {
+    // row has char that differs by two bytes
+    testKeys(new Key("r321hahahaha", "f89222", "q90232e"), new 
Key("r321hchahaha", "f89222", "q90232e"), nk("r321hb", E, E, 0));
+
+    // family has char that differs by two bytes
+    testKeys(new Key("r321hahahaha", "f89222", "q90232e"), new 
Key("r321hahahaha", "f89422", "q90232e"), nk("r321hahahaha", "f893", E, 0));
+
+    // qualifier has char that differs by two bytes
+    testKeys(new Key("r321hahahaha", "f89222", "q90232e"), new 
Key("r321hahahaha", "f89222", "q92232e"), nk("r321hahahaha", "f89222", "q91", 
0));
+  }
+
+  @Test
+  public void testOneCharacterDifferenceAndFF() {
+    byte[] ff1 = Bytes.concat(aff("mop"), "b".getBytes());
+    byte[] ff2 = Bytes.concat(aff("mop"), FF, "b".getBytes());
+
+    byte[] eff1 = Bytes.concat(aff("mop"), FF, FF);
+    byte[] eff2 = Bytes.concat(aff("mop"), FF, FF, FF);
+
+    testKeys(nk(ff1, "f89222", "q90232e", 34), new Key("mor56", "f89222", 
"q90232e"), nk(eff1, E, E, 0));
+    testKeys(nk("r1", ff1, "q90232e", 34), new Key("r1", "mor56", "q90232e"), 
nk("r1", eff1, E, 0));
+    testKeys(nk("r1", "f1", ff1, 34), new Key("r1", "f1", "mor56"), nk("r1", 
"f1", eff1, 0));
+
+    testKeys(nk(ff2, "f89222", "q90232e", 34), new Key("mor56", "f89222", 
"q90232e"), nk(eff2, E, E, 0));
+    testKeys(nk("r1", ff2, "q90232e", 34), new Key("r1", "mor56", "q90232e"), 
nk("r1", eff2, E, 0));
+    testKeys(nk("r1", "f1", ff2, 34), new Key("r1", "f1", "mor56"), nk("r1", 
"f1", eff2, 0));
+
+  }
+
+  @Test
+  public void testOneCharacterDifferenceAtEnd() {
+    testKeys(new Key("r321hahahaha", "f89222", "q90232e"), new 
Key("r321hahahahb", "f89222", "q90232e"), nk(a00("r321hahahaha"), E, E, 0));
+    testKeys(new Key("r321hahahaha", "f89222", "q90232e"), new 
Key("r321hahahaha", "f89223", "q90232e"), nk("r321hahahaha", a00("f89222"), E, 
0));
+    testKeys(new Key("r321hahahaha", "f89222", "q90232e"), new 
Key("r321hahahaha", "f89222", "q90232f"), nk("r321hahahaha", "f89222", 
a00("q90232e"), 0));
+  }
+
+  @Test
+  public void testSamePrefix() {
+    testKeys(new Key("r3boot4", "f89222", "q90232e"), new Key("r3boot452", 
"f89222", "q90232e"), nk(a00("r3boot4"), E, E, 0));
+    testKeys(new Key("r3boot4", "f892", "q90232e"), new Key("r3boot4", 
"f89222", "q90232e"), nk("r3boot4", a00("f892"), E, 0));
+    testKeys(new Key("r3boot4", "f89222", "q902"), new Key("r3boot4", 
"f89222", "q90232e"), nk("r3boot4", "f89222", a00("q902"), 0));
+  }
+
+  @Test
+  public void testSamePrefixAnd00() {
+    Key prev = new Key("r3boot4", "f89222", "q90232e");
+    Assert.assertEquals(prev, KeyShortener.shorten(prev, nk(a00("r3boot4"), 
"f89222", "q90232e", 8)));
+    prev = new Key("r3boot4", "f892", "q90232e");
+    Assert.assertEquals(prev, KeyShortener.shorten(prev, nk("r3boot4", 
a00("f892"), "q90232e", 8)));
+    prev = new Key("r3boot4", "f89222", "q902");
+    Assert.assertEquals(prev, KeyShortener.shorten(prev, nk("r3boot4", 
"f89222", a00("q902"), 8)));
+  }
+
+  @Test
+  public void testSanityCheck1() {
+    // prev and shortened equal
+    Key prev = new Key("r001", "f002", "q006");
+    Assert.assertEquals(prev, KeyShortener.sanityCheck(prev, new Key("r002", 
"f002", "q006"), new Key("r001", "f002", "q006")));
+    // prev > shortened equal
+    Assert.assertEquals(prev, KeyShortener.sanityCheck(prev, new Key("r003", 
"f002", "q006"), new Key("r001", "f002", "q006")));
+    // current and shortened equal
+    Assert.assertEquals(prev, KeyShortener.sanityCheck(prev, new Key("r003", 
"f002", "q006"), new Key("r003", "f002", "q006")));
+    // shortened > current
+    Assert.assertEquals(prev, KeyShortener.sanityCheck(prev, new Key("r003", 
"f002", "q006"), new Key("r004", "f002", "q006")));
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/1ac7b4a9/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileMetricsTest.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileMetricsTest.java 
b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileMetricsTest.java
index 7f8c087..92a1d32 100644
--- 
a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileMetricsTest.java
+++ 
b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileMetricsTest.java
@@ -459,7 +459,7 @@ public class RFileMetricsTest {
   public void multiBlockMultiCFNonDefaultAndDefaultLocGroup() throws 
IOException {
     // test an rfile with multiple column families and multiple blocks in a 
non-default locality group and the default locality group
 
-    trf.openWriter(false, 20);// Each entry is a block
+    trf.openWriter(false, 10);// Each entry is a block
     Set<ByteSequence> lg1 = new HashSet<>();
     lg1.add(new ArrayByteSequence("cf1"));
     lg1.add(new ArrayByteSequence("cf3"));

http://git-wip-us.apache.org/repos/asf/accumulo/blob/1ac7b4a9/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java 
b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
index 5f66503..d97a4db 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
@@ -30,6 +30,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.util.AbstractMap;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
@@ -57,6 +58,7 @@ import org.apache.accumulo.core.file.FileSKVIterator;
 import org.apache.accumulo.core.file.blockfile.cache.LruBlockCache;
 import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile;
 import org.apache.accumulo.core.file.rfile.RFile.Reader;
+import org.apache.accumulo.core.file.streams.PositionedOutputs;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 import org.apache.accumulo.core.iterators.system.ColumnFamilySkippingIterator;
 import org.apache.accumulo.core.metadata.MetadataTable;
@@ -86,7 +88,6 @@ import com.google.common.hash.HashCode;
 import com.google.common.hash.Hasher;
 import com.google.common.hash.Hashing;
 import com.google.common.primitives.Bytes;
-import org.apache.accumulo.core.file.streams.PositionedOutputs;
 
 public class RFileTest {
 
@@ -241,7 +242,7 @@ public class RFileTest {
     }
 
     public void openWriter() throws IOException {
-      openWriter(true, 1000);
+      openWriter(1000);
     }
 
     public void openWriter(int blockSize) throws IOException {
@@ -385,6 +386,8 @@ public class RFileTest {
             String cvS = "" + (char) cv;
             for (int ts = 4; ts > 0; ts--) {
               Key k = nk(rowS, cfS, cqS, cvS, ts);
+              // check below ensures when all key sizes are same more than one 
index block is created
+              Assert.assertEquals(27, k.getSize());
               k.setDeleted(true);
               Value v = nv("" + val);
               trf.writer.append(k, v);
@@ -392,6 +395,7 @@ public class RFileTest {
               expectedValues.add(v);
 
               k = nk(rowS, cfS, cqS, cvS, ts);
+              Assert.assertEquals(27, k.getSize());
               v = nv("" + val);
               trf.writer.append(k, v);
               expectedKeys.add(k);
@@ -500,6 +504,15 @@ public class RFileTest {
       }
     }
 
+    // count the number of index entries
+    FileSKVIterator iiter = trf.reader.getIndex();
+    int count = 0;
+    while (iiter.hasTop()) {
+      count++;
+      iiter.next();
+    }
+    Assert.assertEquals(20, count);
+
     trf.closeReader();
   }
 
@@ -2091,6 +2104,56 @@ public class RFileTest {
   }
 
   @Test
+  public void testBigKeys() throws IOException {
+    // this test ensures that big keys do not end up index
+    ArrayList<Key> keys = new ArrayList<>();
+
+    for (int i = 0; i < 1000; i++) {
+      String row = String.format("r%06d", i);
+      keys.add(new Key(row, "cf1", "cq1", 42));
+    }
+
+    // add a few keys with long rows
+    for (int i = 0; i < 1000; i += 100) {
+      String row = String.format("r%06d", i);
+      char ca[] = new char[1000];
+      Arrays.fill(ca, 'b');
+      row = row + new String(ca);
+      keys.add(new Key(row, "cf1", "cq1", 42));
+    }
+
+    Collections.sort(keys);
+
+    TestRFile trf = new TestRFile(conf);
+
+    trf.openWriter();
+
+    for (Key k : keys) {
+      trf.writer.append(k, new Value((k.hashCode() + "").getBytes()));
+    }
+
+    trf.writer.close();
+
+    trf.openReader();
+
+    FileSKVIterator iiter = trf.reader.getIndex();
+    while (iiter.hasTop()) {
+      Key k = iiter.getTopKey();
+      Assert.assertTrue(k + " " + k.getSize() + " >= 20", k.getSize() < 20);
+      iiter.next();
+    }
+
+    Collections.shuffle(keys);
+
+    for (Key key : keys) {
+      trf.reader.seek(new Range(key, null), EMPTY_COL_FAMS, false);
+      Assert.assertTrue(trf.reader.hasTop());
+      Assert.assertEquals(key, trf.reader.getTopKey());
+      Assert.assertEquals(new Value((key.hashCode() + "").getBytes()), 
trf.reader.getTopValue());
+    }
+  }
+
+  @Test
   public void testCryptoDoesntLeakSensitive() throws IOException {
     conf = setAndGetAccumuloConfig(CryptoTest.CRYPTO_ON_CONF);
     // test an empty file

http://git-wip-us.apache.org/repos/asf/accumulo/blob/1ac7b4a9/core/src/test/java/org/apache/accumulo/core/file/rfile/RelativeKeyTest.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/accumulo/core/file/rfile/RelativeKeyTest.java 
b/core/src/test/java/org/apache/accumulo/core/file/rfile/RelativeKeyTest.java
index e413448..4ca4b6c 100644
--- 
a/core/src/test/java/org/apache/accumulo/core/file/rfile/RelativeKeyTest.java
+++ 
b/core/src/test/java/org/apache/accumulo/core/file/rfile/RelativeKeyTest.java
@@ -30,6 +30,7 @@ import org.apache.accumulo.core.data.ArrayByteSequence;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.PartialKey;
 import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.rfile.RelativeKey.SkippR;
 import org.apache.accumulo.core.util.MutableByteSequence;
 import org.apache.accumulo.core.util.UnsynchronizedBuffer;
 import org.junit.Before;
@@ -178,7 +179,7 @@ public class RelativeKeyTest {
     Key currKey = null;
     MutableByteSequence value = new MutableByteSequence(new byte[64], 0, 0);
 
-    RelativeKey.SkippR skippr = RelativeKey.fastSkip(in, seekKey, value, 
prevKey, currKey);
+    RelativeKey.SkippR skippr = RelativeKey.fastSkip(in, seekKey, value, 
prevKey, currKey, expectedKeys.size());
     assertEquals(1, skippr.skipped);
     assertEquals(new Key(), skippr.prevKey);
     assertEquals(expectedKeys.get(0), skippr.rk.getKey());
@@ -192,7 +193,7 @@ public class RelativeKeyTest {
 
     seekKey = new Key("a", "b", "c", "d", 1);
     seekKey.setDeleted(true);
-    skippr = RelativeKey.fastSkip(in, seekKey, value, prevKey, currKey);
+    skippr = RelativeKey.fastSkip(in, seekKey, value, prevKey, currKey, 
expectedKeys.size());
     assertEquals(1, skippr.skipped);
     assertEquals(new Key(), skippr.prevKey);
     assertEquals(expectedKeys.get(0), skippr.rk.getKey());
@@ -203,13 +204,23 @@ public class RelativeKeyTest {
   }
 
   @Test(expected = EOFException.class)
+  public void testSeekAfterEverythingWrongCount() throws IOException {
+    Key seekKey = new Key("s", "t", "u", "v", 1);
+    Key prevKey = new Key();
+    Key currKey = null;
+    MutableByteSequence value = new MutableByteSequence(new byte[64], 0, 0);
+
+    RelativeKey.fastSkip(in, seekKey, value, prevKey, currKey, 
expectedKeys.size() + 1);
+  }
+
   public void testSeekAfterEverything() throws IOException {
     Key seekKey = new Key("s", "t", "u", "v", 1);
     Key prevKey = new Key();
     Key currKey = null;
     MutableByteSequence value = new MutableByteSequence(new byte[64], 0, 0);
 
-    RelativeKey.fastSkip(in, seekKey, value, prevKey, currKey);
+    SkippR skippr = RelativeKey.fastSkip(in, seekKey, value, prevKey, currKey, 
expectedKeys.size());
+    assertEquals(expectedKeys.size(), skippr.skipped);
   }
 
   @Test
@@ -220,7 +231,7 @@ public class RelativeKeyTest {
     Key currKey = null;
     MutableByteSequence value = new MutableByteSequence(new byte[64], 0, 0);
 
-    RelativeKey.SkippR skippr = RelativeKey.fastSkip(in, seekKey, value, 
prevKey, currKey);
+    RelativeKey.SkippR skippr = RelativeKey.fastSkip(in, seekKey, value, 
prevKey, currKey, expectedKeys.size());
 
     assertEquals(seekIndex + 1, skippr.skipped);
     assertEquals(expectedKeys.get(seekIndex - 1), skippr.prevKey);
@@ -236,14 +247,17 @@ public class RelativeKeyTest {
     int i;
     for (i = seekIndex; expectedKeys.get(i).compareTo(fKey) < 0; i++) {}
 
-    skippr = RelativeKey.fastSkip(in, expectedKeys.get(i), value, prevKey, 
currKey);
+    int left = expectedKeys.size();
+
+    skippr = RelativeKey.fastSkip(in, expectedKeys.get(i), value, prevKey, 
currKey, expectedKeys.size());
     assertEquals(i + 1, skippr.skipped);
+    left -= skippr.skipped;
     assertEquals(expectedKeys.get(i - 1), skippr.prevKey);
     assertEquals(expectedKeys.get(i), skippr.rk.getKey());
     assertEquals(expectedValues.get(i).toString(), value.toString());
 
     // try fast skipping to our current location
-    skippr = RelativeKey.fastSkip(in, expectedKeys.get(i), value, 
expectedKeys.get(i - 1), expectedKeys.get(i));
+    skippr = RelativeKey.fastSkip(in, expectedKeys.get(i), value, 
expectedKeys.get(i - 1), expectedKeys.get(i), left);
     assertEquals(0, skippr.skipped);
     assertEquals(expectedKeys.get(i - 1), skippr.prevKey);
     assertEquals(expectedKeys.get(i), skippr.rk.getKey());
@@ -253,7 +267,7 @@ public class RelativeKeyTest {
     fKey = expectedKeys.get(i).followingKey(PartialKey.ROW_COLFAM);
     int j;
     for (j = i; expectedKeys.get(j).compareTo(fKey) < 0; j++) {}
-    skippr = RelativeKey.fastSkip(in, fKey, value, expectedKeys.get(i - 1), 
expectedKeys.get(i));
+    skippr = RelativeKey.fastSkip(in, fKey, value, expectedKeys.get(i - 1), 
expectedKeys.get(i), left);
     assertEquals(j - i, skippr.skipped);
     assertEquals(expectedKeys.get(j - 1), skippr.prevKey);
     assertEquals(expectedKeys.get(j), skippr.rk.getKey());

Reply via email to