Repository: accumulo
Updated Branches:
  refs/heads/1.7 d10840b8e -> d33b2a09d


ACCUMULO-4318 choose smaller keys in rfile index


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/63a8a5d7
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/63a8a5d7
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/63a8a5d7

Branch: refs/heads/1.7
Commit: 63a8a5d7eec7b0ab6dce107d382143562e21b527
Parents: 5b971d6
Author: Keith Turner <ktur...@apache.org>
Authored: Fri May 27 18:00:25 2016 -0400
Committer: Keith Turner <ktur...@apache.org>
Committed: Fri May 27 18:00:25 2016 -0400

----------------------------------------------------------------------
 .../apache/accumulo/core/file/rfile/RFile.java  | 32 ++++++++--
 .../accumulo/core/file/rfile/RFileTest.java     | 63 ++++++++++++++++++++
 2 files changed, 91 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/63a8a5d7/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java 
b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
index 80d5150..2fc4e4a 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
@@ -64,6 +64,7 @@ import 
org.apache.accumulo.core.iterators.system.LocalityGroupIterator;
 import 
org.apache.accumulo.core.iterators.system.LocalityGroupIterator.LocalityGroup;
 import org.apache.accumulo.core.util.MutableByteSequence;
 import org.apache.commons.lang.mutable.MutableLong;
+import org.apache.commons.math.stat.descriptive.SummaryStatistics;
 import org.apache.hadoop.io.Writable;
 import org.apache.log4j.Logger;
 
@@ -281,13 +282,15 @@ public class RFile {
   public static class Writer implements FileSKVWriter {
 
     public static final int MAX_CF_IN_DLG = 1000;
+    private static final double MAX_BLOCK_MULTIPLIER = 1.1;
 
     private BlockFileWriter fileWriter;
     private ABlockWriter blockWriter;
 
     // private BlockAppender blockAppender;
-    private long blockSize = 100000;
-    private int indexBlockSize;
+    private final long blockSize;
+    private final long maxBlockSize;
+    private final int indexBlockSize;
     private int entries = 0;
 
     private ArrayList<LocalityGroupMetadata> localityGroups = new 
ArrayList<LocalityGroupMetadata>();
@@ -303,12 +306,16 @@ public class RFile {
 
     private HashSet<ByteSequence> previousColumnFamilies;
 
+    private SummaryStatistics keyLenStats = new SummaryStatistics();
+    private double avergageKeySize = 0;
+
     public Writer(BlockFileWriter bfw, int blockSize) throws IOException {
       this(bfw, blockSize, (int) 
AccumuloConfiguration.getDefaultConfiguration().getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX));
     }
 
     public Writer(BlockFileWriter bfw, int blockSize, int indexBlockSize) 
throws IOException {
       this.blockSize = blockSize;
+      this.maxBlockSize = (long) (blockSize * MAX_BLOCK_MULTIPLIER);
       this.indexBlockSize = indexBlockSize;
       this.fileWriter = bfw;
       this.blockWriter = null;
@@ -358,6 +365,11 @@ public class RFile {
       }
     }
 
+    private boolean isGiantKey(Key k) {
+      // consider a key thats more than 3 standard deviations from previously 
seen key sizes as giant
+      return k.getSize() > keyLenStats.getMean() + 
keyLenStats.getStandardDeviation() * 3;
+    }
+
     @Override
     public void append(Key key, Value value) throws IOException {
 
@@ -378,8 +390,18 @@ public class RFile {
       if (blockWriter == null) {
         blockWriter = fileWriter.prepareDataBlock();
       } else if (blockWriter.getRawSize() > blockSize) {
-        closeBlock(prevKey, false);
-        blockWriter = fileWriter.prepareDataBlock();
+
+        if (avergageKeySize == 0) {
+          // use the same average for the search for a below average key for a 
block
+          avergageKeySize = keyLenStats.getMean();
+        }
+
+        if ((prevKey.getSize() <= avergageKeySize || blockWriter.getRawSize() 
> maxBlockSize) && !isGiantKey(prevKey)) {
+          closeBlock(prevKey, false);
+          blockWriter = fileWriter.prepareDataBlock();
+          // set average to zero so its recomputed for the next block
+          avergageKeySize = 0;
+        }
       }
 
       RelativeKey rk = new RelativeKey(lastKeyInBlock, key);
@@ -388,6 +410,8 @@ public class RFile {
       value.write(blockWriter);
       entries++;
 
+      keyLenStats.addValue(key.getSize());
+
       prevKey = new Key(key);
       lastKeyInBlock = prevKey;
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/63a8a5d7/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java 
b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
index 1a83f33..6a29610 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
@@ -29,6 +29,7 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
@@ -333,6 +334,8 @@ public class RFileTest {
             String cvS = "" + (char) cv;
             for (int ts = 4; ts > 0; ts--) {
               Key k = nk(rowS, cfS, cqS, cvS, ts);
+              // check below ensures when all key sizes are same more than one 
index block is created
+              assertEquals(27, k.getSize());
               k.setDeleted(true);
               Value v = nv("" + val);
               trf.writer.append(k, v);
@@ -340,6 +343,7 @@ public class RFileTest {
               expectedValues.add(v);
 
               k = nk(rowS, cfS, cqS, cvS, ts);
+              assertEquals(27, k.getSize());
               v = nv("" + val);
               trf.writer.append(k, v);
               expectedKeys.add(k);
@@ -448,6 +452,15 @@ public class RFileTest {
       }
     }
 
+    // count the number of index entries
+    FileSKVIterator iiter = trf.reader.getIndex();
+    int count = 0;
+    while (iiter.hasTop()) {
+      count++;
+      iiter.next();
+    }
+    assertEquals(20, count);
+
     trf.closeReader();
   }
 
@@ -1751,6 +1764,56 @@ public class RFileTest {
   }
 
   @Test
+  public void testBigKeys() throws IOException {
+    // this test ensures that big keys do not end up index
+    ArrayList<Key> keys = new ArrayList<Key>();
+
+    for (int i = 0; i < 1000; i++) {
+      String row = String.format("r%06d", i);
+      keys.add(new Key(row, "cf1", "cq1", 42));
+    }
+
+    // add a few keys with long rows
+    for (int i = 0; i < 1000; i += 100) {
+      String row = String.format("r%06d", i);
+      char ca[] = new char[1000];
+      Arrays.fill(ca, 'b');
+      row = row + new String(ca);
+      keys.add(new Key(row, "cf1", "cq1", 42));
+    }
+
+    Collections.sort(keys);
+
+    TestRFile trf = new TestRFile(conf);
+
+    trf.openWriter();
+
+    for (Key k : keys) {
+      trf.writer.append(k, new Value((k.hashCode() + "").getBytes()));
+    }
+
+    trf.writer.close();
+
+    trf.openReader();
+
+    FileSKVIterator iiter = trf.reader.getIndex();
+    while (iiter.hasTop()) {
+      Key k = iiter.getTopKey();
+      assertTrue(k + " " + k.getSize() + " >= 20", k.getSize() < 20);
+      iiter.next();
+    }
+
+    Collections.shuffle(keys);
+
+    for (Key key : keys) {
+      trf.reader.seek(new Range(key, null), EMPTY_COL_FAMS, false);
+      assertTrue(trf.reader.hasTop());
+      assertEquals(key, trf.reader.getTopKey());
+      assertEquals(new Value((key.hashCode() + "").getBytes()), 
trf.reader.getTopValue());
+    }
+  }
+
+  @Test
   public void testCryptoDoesntLeakSensitive() throws IOException {
     conf = setAndGetAccumuloConfig(CryptoTest.CRYPTO_ON_CONF);
     // test an empty file

Reply via email to