http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java b/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java index f78ae66..61e6b5c 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java +++ b/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java @@ -64,44 +64,44 @@ import org.slf4j.LoggerFactory; /** * A class that sits on top of different accumulo file formats and provides bloom filter functionality. - * + * */ public class BloomFilterLayer { private static final Logger LOG = LoggerFactory.getLogger(BloomFilterLayer.class); public static final String BLOOM_FILE_NAME = "acu_bloom"; public static final int HASH_COUNT = 5; - + private static ExecutorService loadThreadPool = null; - + private static synchronized ExecutorService getLoadThreadPool(int maxLoadThreads) { if (loadThreadPool != null) { return loadThreadPool; } - + if (maxLoadThreads > 0) { BlockingQueue<Runnable> q = new LinkedBlockingQueue<Runnable>(); loadThreadPool = new ThreadPoolExecutor(0, maxLoadThreads, 60, TimeUnit.SECONDS, q, new NamingThreadFactory("bloom-loader")); } - + return loadThreadPool; } - + public static class Writer implements FileSKVWriter { private DynamicBloomFilter bloomFilter; private int numKeys; private int vectorSize; - + private FileSKVWriter writer; private KeyFunctor transformer = null; private boolean closed = false; - + Writer(FileSKVWriter writer, AccumuloConfiguration acuconf) { this.writer = writer; initBloomFilter(acuconf); } - + private synchronized void initBloomFilter(AccumuloConfiguration acuconf) { - + numKeys = acuconf.getCount(Property.TABLE_BLOOM_SIZE); // vector size should be <code>-kn / (ln(1 - c^(1/k)))</code> bits for // single key, where <code> is the number of hash functions, @@ -111,7 +111,7 @@ public class BloomFilterLayer { double errorRate = acuconf.getFraction(Property.TABLE_BLOOM_ERRORRATE); vectorSize = (int) Math.ceil(-HASH_COUNT * numKeys / Math.log(1.0 - Math.pow(errorRate, 1.0 / HASH_COUNT))); bloomFilter = new DynamicBloomFilter(vectorSize, HASH_COUNT, Hash.parseHashType(acuconf.get(Property.TABLE_BLOOM_HASHTYPE)), numKeys); - + /** * load KeyFunctor */ @@ -125,15 +125,15 @@ public class BloomFilterLayer { clazz = AccumuloVFSClassLoader.loadClass(classname, KeyFunctor.class); transformer = clazz.newInstance(); - + } catch (Exception e) { LOG.error("Failed to find KeyFunctor: " + acuconf.get(Property.TABLE_BLOOM_KEY_FUNCTOR), e); throw new IllegalArgumentException("Failed to find KeyFunctor: " + acuconf.get(Property.TABLE_BLOOM_KEY_FUNCTOR)); - + } - + } - + @Override public synchronized void append(org.apache.accumulo.core.data.Key key, Value val) throws IOException { writer.append(key, val); @@ -141,13 +141,13 @@ public class BloomFilterLayer { if (bloomKey.getBytes().length > 0) bloomFilter.add(bloomKey); } - + @Override public synchronized void close() throws IOException { - + if (closed) return; - + DataOutputStream out = writer.createMetaStore(BLOOM_FILE_NAME); out.writeUTF(transformer.getClass().getName()); bloomFilter.write(out); @@ -156,31 +156,31 @@ public class BloomFilterLayer { writer.close(); closed = true; } - + @Override public DataOutputStream createMetaStore(String name) throws IOException { return writer.createMetaStore(name); } - + @Override public void startDefaultLocalityGroup() throws IOException { writer.startDefaultLocalityGroup(); - + } - + @Override public void startNewLocalityGroup(String name, Set<ByteSequence> columnFamilies) throws IOException { writer.startNewLocalityGroup(name, columnFamilies); } - + @Override public boolean supportsLocalityGroups() { return writer.supportsLocalityGroups(); } } - + static class BloomFilterLoader { - + private volatile DynamicBloomFilter bloomFilter; private int loadRequest = 0; private int loadThreshold = 1; @@ -188,33 +188,33 @@ public class BloomFilterLayer { private Runnable loadTask; private volatile KeyFunctor transformer = null; private volatile boolean closed = false; - + BloomFilterLoader(final FileSKVIterator reader, AccumuloConfiguration acuconf) { - + maxLoadThreads = acuconf.getCount(Property.TSERV_BLOOM_LOAD_MAXCONCURRENT); - + loadThreshold = acuconf.getCount(Property.TABLE_BLOOM_LOAD_THRESHOLD); - + final String context = acuconf.get(Property.TABLE_CLASSPATH); loadTask = new Runnable() { @Override public void run() { - + // no need to load the bloom filter if the map file is closed if (closed) return; String ClassName = null; DataInputStream in = null; - + try { in = reader.getMetaStore(BLOOM_FILE_NAME); DynamicBloomFilter tmpBloomFilter = new DynamicBloomFilter(); - + // check for closed again after open but before reading the bloom filter in if (closed) return; - + /** * Load classname for keyFunctor */ @@ -226,11 +226,11 @@ public class BloomFilterLayer { else clazz = AccumuloVFSClassLoader.loadClass(ClassName, KeyFunctor.class); transformer = clazz.newInstance(); - + /** * read in bloom filter */ - + tmpBloomFilter.readFields(in); // only set the bloom filter after it is fully constructed bloomFilter = tmpBloomFilter; @@ -241,7 +241,7 @@ public class BloomFilterLayer { LOG.warn("Can't open BloomFilter", ioe); else LOG.debug("Can't open BloomFilter, file closed : " + ioe.getMessage()); - + bloomFilter = null; } catch (ClassNotFoundException e) { LOG.error("Failed to find KeyFunctor in config: " + ClassName, e); @@ -268,11 +268,11 @@ public class BloomFilterLayer { } } }; - + initiateLoad(maxLoadThreads); - + } - + private synchronized void initiateLoad(int maxLoadThreads) { // ensure only one thread initiates loading of bloom filter by // only taking action when loadTask != null @@ -291,14 +291,14 @@ public class BloomFilterLayer { loadTask = null; } } - + loadRequest++; } - + /** * Checks if this {@link RFile} contains keys from this range. The membership test is performed using a Bloom filter, so the result has always non-zero * probability of false positives. - * + * * @param range * range of keys to check * @return false iff key doesn't exist, true if key probably exists. @@ -309,45 +309,45 @@ public class BloomFilterLayer { if (bloomFilter == null) return true; } - + Key bloomKey = transformer.transform(range); - + if (bloomKey == null || bloomKey.getBytes().length == 0) return true; - + return bloomFilter.membershipTest(bloomKey); } - + public void close() { this.closed = true; } } - + public static class Reader implements FileSKVIterator { - + private BloomFilterLoader bfl; private FileSKVIterator reader; - + public Reader(FileSKVIterator reader, AccumuloConfiguration acuconf) { this.reader = reader; bfl = new BloomFilterLoader(reader, acuconf); } - + private Reader(FileSKVIterator src, BloomFilterLoader bfl) { this.reader = src; this.bfl = bfl; } - + private boolean checkSuper = true; - + @Override public boolean hasTop() { return checkSuper ? reader.hasTop() : false; } - + @Override public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException { - + if (!bfl.probablyHasKey(range)) { checkSuper = false; } else { @@ -355,118 +355,118 @@ public class BloomFilterLayer { checkSuper = true; } } - + @Override public synchronized void close() throws IOException { bfl.close(); reader.close(); } - + @Override public org.apache.accumulo.core.data.Key getFirstKey() throws IOException { return reader.getFirstKey(); } - + @Override public org.apache.accumulo.core.data.Key getLastKey() throws IOException { return reader.getLastKey(); } - + @Override public SortedKeyValueIterator<org.apache.accumulo.core.data.Key,Value> deepCopy(IteratorEnvironment env) { return new BloomFilterLayer.Reader((FileSKVIterator) reader.deepCopy(env), bfl); } - + @Override public org.apache.accumulo.core.data.Key getTopKey() { return reader.getTopKey(); } - + @Override public Value getTopValue() { return reader.getTopValue(); } - + @Override public void init(SortedKeyValueIterator<org.apache.accumulo.core.data.Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException { throw new UnsupportedOperationException(); - + } - + @Override public void next() throws IOException { reader.next(); } - + @Override public DataInputStream getMetaStore(String name) throws IOException { return reader.getMetaStore(name); } - + @Override public void closeDeepCopies() throws IOException { reader.closeDeepCopies(); } - + @Override public void setInterruptFlag(AtomicBoolean flag) { reader.setInterruptFlag(flag); } - + } - + public static void main(String[] args) throws IOException { PrintStream out = System.out; - + Random r = new Random(); - + HashSet<Integer> valsSet = new HashSet<Integer>(); - + for (int i = 0; i < 100000; i++) { valsSet.add(r.nextInt(Integer.MAX_VALUE)); } - + ArrayList<Integer> vals = new ArrayList<Integer>(valsSet); Collections.sort(vals); - + ConfigurationCopy acuconf = new ConfigurationCopy(AccumuloConfiguration.getDefaultConfiguration()); acuconf.set(Property.TABLE_BLOOM_ENABLED, "true"); acuconf.set(Property.TABLE_BLOOM_KEY_FUNCTOR, "accumulo.core.file.keyfunctor.ColumnFamilyFunctor"); acuconf.set(Property.TABLE_FILE_TYPE, RFile.EXTENSION); acuconf.set(Property.TABLE_BLOOM_LOAD_THRESHOLD, "1"); acuconf.set(Property.TSERV_BLOOM_LOAD_MAXCONCURRENT, "1"); - + Configuration conf = CachedConfiguration.getInstance(); FileSystem fs = FileSystem.get(conf); - + String suffix = FileOperations.getNewFileExtension(acuconf); String fname = "/tmp/test." + suffix; FileSKVWriter bmfw = FileOperations.getInstance().openWriter(fname, fs, conf, acuconf); - + long t1 = System.currentTimeMillis(); - + bmfw.startDefaultLocalityGroup(); - + for (Integer i : vals) { String fi = String.format("%010d", i); bmfw.append(new org.apache.accumulo.core.data.Key(new Text("r" + fi), new Text("cf1")), new Value(("v" + fi).getBytes(UTF_8))); bmfw.append(new org.apache.accumulo.core.data.Key(new Text("r" + fi), new Text("cf2")), new Value(("v" + fi).getBytes(UTF_8))); } - + long t2 = System.currentTimeMillis(); - + out.printf("write rate %6.2f%n", vals.size() / ((t2 - t1) / 1000.0)); - + bmfw.close(); - + t1 = System.currentTimeMillis(); FileSKVIterator bmfr = FileOperations.getInstance().openReader(fname, false, fs, conf, acuconf); t2 = System.currentTimeMillis(); out.println("Opened " + fname + " in " + (t2 - t1)); - + t1 = System.currentTimeMillis(); - + int hits = 0; for (int i = 0; i < 5000; i++) { int row = r.nextInt(Integer.MAX_VALUE); @@ -481,36 +481,36 @@ public class BloomFilterLayer { } } } - + t2 = System.currentTimeMillis(); - + out.printf("random lookup rate : %6.2f%n", 5000 / ((t2 - t1) / 1000.0)); out.println("hits = " + hits); - + int count = 0; - + t1 = System.currentTimeMillis(); - + for (Integer row : valsSet) { String fi = String.format("%010d", row); // bmfr.seek(new Range(new Text("r"+fi))); - + org.apache.accumulo.core.data.Key k1 = new org.apache.accumulo.core.data.Key(new Text("r" + fi), new Text("cf1")); bmfr.seek(new Range(k1, true, k1.followingKey(PartialKey.ROW_COLFAM), false), new ArrayList<ByteSequence>(), false); - + if (!bmfr.hasTop()) { out.println("ERROR 2 " + row); } - + count++; - + if (count >= 500) { break; } } - + t2 = System.currentTimeMillis(); - + out.printf("existant lookup rate %6.2f%n", 500 / ((t2 - t1) / 1000.0)); out.println("expected hits 500. Receive hits: " + count); bmfr.close();
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java b/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java index 17e540b..78d0407 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java +++ b/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java @@ -35,23 +35,23 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; class DispatchingFileFactory extends FileOperations { - + private FileOperations findFileFactory(String file) { - + Path p = new Path(file); String name = p.getName(); - + if (name.startsWith(Constants.MAPFILE_EXTENSION + "_")) { return new MapFileOperations(); } String[] sp = name.split("\\."); - + if (sp.length < 2) { throw new IllegalArgumentException("File name " + name + " has no extension"); } - + String extension = sp[sp.length - 1]; - + if (extension.equals(Constants.MAPFILE_EXTENSION) || extension.equals(Constants.MAPFILE_EXTENSION + "_tmp")) { return new MapFileOperations(); } else if (extension.equals(RFile.EXTENSION) || extension.equals(RFile.EXTENSION + "_tmp")) { @@ -60,12 +60,12 @@ class DispatchingFileFactory extends FileOperations { throw new IllegalArgumentException("File type " + extension + " not supported"); } } - + @Override public FileSKVIterator openIndex(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException { return findFileFactory(file).openIndex(file, fs, conf, acuconf, null, null); } - + @Override public FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException { FileSKVIterator iter = findFileFactory(file).openReader(file, seekToBeginning, fs, conf, acuconf, null, null); @@ -74,7 +74,7 @@ class DispatchingFileFactory extends FileOperations { } return iter; } - + @Override public FileSKVWriter openWriter(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException { FileSKVWriter writer = findFileFactory(file).openWriter(file, fs, conf, acuconf); @@ -83,107 +83,107 @@ class DispatchingFileFactory extends FileOperations { } return writer; } - + @Override public long getFileSize(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException { return findFileFactory(file).getFileSize(file, fs, conf, acuconf); } - + @Override public FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf, AccumuloConfiguration tableConf) throws IOException { return findFileFactory(file).openReader(file, range, columnFamilies, inclusive, fs, conf, tableConf, null, null); } - + @Override public FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf, AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException { - + if (!tableConf.getBoolean(Property.TABLE_INDEXCACHE_ENABLED)) indexCache = null; if (!tableConf.getBoolean(Property.TABLE_BLOCKCACHE_ENABLED)) dataCache = null; - + return findFileFactory(file).openReader(file, range, columnFamilies, inclusive, fs, conf, tableConf, dataCache, indexCache); } - + @Override public FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf, BlockCache dataCache, BlockCache indexCache) throws IOException { - + if (!acuconf.getBoolean(Property.TABLE_INDEXCACHE_ENABLED)) indexCache = null; if (!acuconf.getBoolean(Property.TABLE_BLOCKCACHE_ENABLED)) dataCache = null; - + FileSKVIterator iter = findFileFactory(file).openReader(file, seekToBeginning, fs, conf, acuconf, dataCache, indexCache); if (acuconf.getBoolean(Property.TABLE_BLOOM_ENABLED)) { return new BloomFilterLayer.Reader(iter, acuconf); } return iter; } - + @Override public FileSKVIterator openIndex(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf, BlockCache dCache, BlockCache iCache) throws IOException { - + if (!acuconf.getBoolean(Property.TABLE_INDEXCACHE_ENABLED)) iCache = null; if (!acuconf.getBoolean(Property.TABLE_BLOCKCACHE_ENABLED)) dCache = null; - + return findFileFactory(file).openIndex(file, fs, conf, acuconf, dCache, iCache); } - + } public abstract class FileOperations { - + private static final HashSet<String> validExtensions = new HashSet<String>(Arrays.asList(Constants.MAPFILE_EXTENSION, RFile.EXTENSION)); - + public static Set<String> getValidExtensions() { return validExtensions; } - + public static String getNewFileExtension(AccumuloConfiguration acuconf) { return acuconf.get(Property.TABLE_FILE_TYPE); } - + public static FileOperations getInstance() { return new DispatchingFileFactory(); } - + /** * Open a reader that will not be seeked giving an initial seek location. This is useful for file operations that only need to scan data within a range and do * not need to seek. Therefore file metadata such as indexes does not need to be kept in memory while the file is scanned. Also seek optimizations like bloom * filters do not need to be loaded. - * + * */ - + public abstract FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf, AccumuloConfiguration tableConf) throws IOException; - + public abstract FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf, AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException; - + /** * Open a reader that fully support seeking and also enable any optimizations related to seeking, like bloom filters. - * + * */ - + public abstract FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException; - + public abstract FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf, BlockCache dataCache, BlockCache indexCache) throws IOException; - + public abstract FileSKVWriter openWriter(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException; - + public abstract FileSKVIterator openIndex(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException; - + public abstract FileSKVIterator openIndex(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf, BlockCache dCache, BlockCache iCache) throws IOException; - + public abstract long getFileSize(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException; - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/file/FileSKVIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/FileSKVIterator.java b/core/src/main/java/org/apache/accumulo/core/file/FileSKVIterator.java index 2de5cfc..60970e2 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/FileSKVIterator.java +++ b/core/src/main/java/org/apache/accumulo/core/file/FileSKVIterator.java @@ -24,12 +24,12 @@ import org.apache.accumulo.core.iterators.system.InterruptibleIterator; public interface FileSKVIterator extends InterruptibleIterator { Key getFirstKey() throws IOException; - + Key getLastKey() throws IOException; - + DataInputStream getMetaStore(String name) throws IOException, NoSuchMetaStoreException; - + void closeDeepCopies() throws IOException; - + void close() throws IOException; } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/file/FileSKVWriter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/FileSKVWriter.java b/core/src/main/java/org/apache/accumulo/core/file/FileSKVWriter.java index 8718515..f4aa888 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/FileSKVWriter.java +++ b/core/src/main/java/org/apache/accumulo/core/file/FileSKVWriter.java @@ -26,14 +26,14 @@ import org.apache.accumulo.core.data.Value; public interface FileSKVWriter { boolean supportsLocalityGroups(); - + void startNewLocalityGroup(String name, Set<ByteSequence> columnFamilies) throws IOException; - + void startDefaultLocalityGroup() throws IOException; - + void append(Key key, Value value) throws IOException; - + DataOutputStream createMetaStore(String name) throws IOException; - + void close() throws IOException; } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/file/NoSuchMetaStoreException.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/NoSuchMetaStoreException.java b/core/src/main/java/org/apache/accumulo/core/file/NoSuchMetaStoreException.java index 7de78b7..a7b9801 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/NoSuchMetaStoreException.java +++ b/core/src/main/java/org/apache/accumulo/core/file/NoSuchMetaStoreException.java @@ -19,18 +19,15 @@ package org.apache.accumulo.core.file; import java.io.IOException; public class NoSuchMetaStoreException extends IOException { - + public NoSuchMetaStoreException(String msg, Throwable e) { super(msg, e); } - + public NoSuchMetaStoreException(String msg) { super(msg); } - - /** - * - */ + private static final long serialVersionUID = 1L; - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/file/blockfile/ABlockReader.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/ABlockReader.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/ABlockReader.java index 592d325..8df2469 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/ABlockReader.java +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/ABlockReader.java @@ -21,30 +21,31 @@ import java.io.DataInputStream; import java.io.IOException; /* - * Minimal interface to read a block from a + * Minimal interface to read a block from a * block based file - * + * */ public interface ABlockReader extends DataInput { - + long getRawSize(); - + DataInputStream getStream() throws IOException; - + void close() throws IOException; - + /** * An indexable block supports seeking, getting a position, and associating an arbitrary index with the block - * + * * @return true, if the block is indexable; otherwise false. */ boolean isIndexable(); void seek(int position); - /** Get the file position. - + /** + * Get the file position. + * * @return the file position. */ int getPosition(); http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/file/blockfile/ABlockWriter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/ABlockWriter.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/ABlockWriter.java index 19b6f0c..ece0a5e 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/ABlockWriter.java +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/ABlockWriter.java @@ -21,21 +21,21 @@ import java.io.DataOutputStream; import java.io.IOException; /* - * Minimal interface to write a block to a + * Minimal interface to write a block to a * block based file - * + * */ public interface ABlockWriter extends DataOutput { - + long getCompressedSize() throws IOException; - + void close() throws IOException; - + long getRawSize() throws IOException; - + long getStartPos() throws IOException; - + DataOutputStream getStream() throws IOException; - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/file/blockfile/BlockFileReader.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/BlockFileReader.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/BlockFileReader.java index 2c918aa..6d2014a 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/BlockFileReader.java +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/BlockFileReader.java @@ -19,23 +19,23 @@ package org.apache.accumulo.core.file.blockfile; import java.io.IOException; /** - * + * * Provides a generic interface for a Reader for a BlockBaseFile format. Supports the minimal interface required. - * + * * Read a metaBlock and a dataBlock - * + * */ public interface BlockFileReader { - + ABlockReader getMetaBlock(String name) throws IOException; - + ABlockReader getDataBlock(int blockIndex) throws IOException; - + void close() throws IOException; - + ABlockReader getMetaBlock(long offset, long compressedSize, long rawSize) throws IOException; - + ABlockReader getDataBlock(long offset, long compressedSize, long rawSize) throws IOException; - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/file/blockfile/BlockFileWriter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/BlockFileWriter.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/BlockFileWriter.java index cf86006..3bdbea3 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/BlockFileWriter.java +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/BlockFileWriter.java @@ -19,20 +19,20 @@ package org.apache.accumulo.core.file.blockfile; import java.io.IOException; /** - * + * * Provides a generic interface for a Writer for a BlockBaseFile format. Supports the minimal interface required. - * + * * Write a metaBlock and a dataBlock. - * + * */ public interface BlockFileWriter { - + ABlockWriter prepareMetaBlock(String name, String compressionName) throws IOException; - + ABlockWriter prepareMetaBlock(String name) throws IOException; - + ABlockWriter prepareDataBlock() throws IOException; - + void close() throws IOException; } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCache.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCache.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCache.java index 8673385..a6c08ff 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCache.java +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCache.java @@ -25,7 +25,7 @@ package org.apache.accumulo.core.file.blockfile.cache; public interface BlockCache { /** * Add block to cache. - * + * * @param blockName * Zero-based file block number. * @param buf @@ -34,31 +34,31 @@ public interface BlockCache { * Whether block should be treated as in-memory */ CacheEntry cacheBlock(String blockName, byte buf[], boolean inMemory); - + /** * Add block to cache (defaults to not in-memory). - * + * * @param blockName * Zero-based file block number. * @param buf * The block contents wrapped in a ByteBuffer. */ CacheEntry cacheBlock(String blockName, byte buf[]); - + /** * Fetch block from cache. - * + * * @param blockName * Block number to fetch. * @return Block or null if block is not in the cache. */ CacheEntry getBlock(String blockName); - + /** * Shutdown the cache. */ void shutdown(); - + /** * Get the maximum size of this cache. * http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CacheEntry.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CacheEntry.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CacheEntry.java index 0e628d3..2faf696 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CacheEntry.java +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CacheEntry.java @@ -18,9 +18,9 @@ package org.apache.accumulo.core.file.blockfile.cache; public interface CacheEntry { byte[] getBuffer(); - + Object getIndex(); - + void setIndex(Object idx); - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CachedBlock.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CachedBlock.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CachedBlock.java index 4ff0d22..b6d6d41 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CachedBlock.java +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CachedBlock.java @@ -19,19 +19,18 @@ */ package org.apache.accumulo.core.file.blockfile.cache; - /** * Represents an entry in the {@link LruBlockCache}. - * + * * <p> * Makes the block memory-aware with {@link HeapSize} and Comparable to sort by access time for the LRU. It also takes care of priority by either instantiating * as in-memory or handling the transition from single to multiple access. */ public class CachedBlock implements HeapSize, Comparable<CachedBlock>, CacheEntry { - + public final static long PER_BLOCK_OVERHEAD = ClassSize.align(ClassSize.OBJECT + (3 * ClassSize.REFERENCE) + (2 * SizeConstants.SIZEOF_LONG) + ClassSize.STRING + ClassSize.BYTE_BUFFER); - + static enum BlockPriority { /** * Accessed a single time (used for scan-resistance) @@ -46,18 +45,18 @@ public class CachedBlock implements HeapSize, Comparable<CachedBlock>, CacheEntr */ MEMORY }; - + private final String blockName; private final byte buf[]; private volatile long accessTime; private long size; private BlockPriority priority; private Object index; - + public CachedBlock(String blockName, byte buf[], long accessTime) { this(blockName, buf, accessTime, false); } - + public CachedBlock(String blockName, byte buf[], long accessTime, boolean inMemory) { this.blockName = blockName; this.buf = buf; @@ -69,7 +68,7 @@ public class CachedBlock implements HeapSize, Comparable<CachedBlock>, CacheEntr this.priority = BlockPriority.SINGLE; } } - + /** * Block has been accessed. Update its local access time. */ @@ -79,35 +78,35 @@ public class CachedBlock implements HeapSize, Comparable<CachedBlock>, CacheEntr this.priority = BlockPriority.MULTI; } } - + public long heapSize() { return size; } - + public int compareTo(CachedBlock that) { if (this.accessTime == that.accessTime) return 0; return this.accessTime < that.accessTime ? 1 : -1; } - + @Override public byte[] getBuffer() { return this.buf; } - + public String getName() { return this.blockName; } - + public BlockPriority getPriority() { return this.priority; } - + @Override public Object getIndex() { return index; } - + @Override public void setIndex(Object idx) { this.index = idx; http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CachedBlockQueue.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CachedBlockQueue.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CachedBlockQueue.java index d281d65..d08fee1 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CachedBlockQueue.java +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CachedBlockQueue.java @@ -25,21 +25,21 @@ import java.util.PriorityQueue; /** * A memory-bound queue that will grow until an element brings total size >= maxSize. From then on, only entries that are sorted larger than the smallest * current entry will be inserted/replaced. - * + * * <p> * Use this when you want to find the largest elements (according to their ordering, not their heap size) that consume as close to the specified maxSize as * possible. Default behavior is to grow just above rather than just below specified max. - * + * * <p> * Object used in this queue must implement {@link HeapSize} as well as {@link Comparable}. */ public class CachedBlockQueue implements HeapSize { - + private PriorityQueue<CachedBlock> queue; - + private long heapSize; private long maxSize; - + /** * @param maxSize * the target size of elements in the queue @@ -54,14 +54,14 @@ public class CachedBlockQueue implements HeapSize { heapSize = 0; this.maxSize = maxSize; } - + /** * Attempt to add the specified cached block to this queue. - * + * * <p> * If the queue is smaller than the max size, or if the specified element is ordered before the smallest element in the queue, the element will be added to * the queue. Otherwise, there is no side effect of this call. - * + * * @param cb * block to try to add to the queue */ @@ -83,10 +83,10 @@ public class CachedBlockQueue implements HeapSize { } } } - + /** * Get a sorted List of all elements in this queue, in descending order. - * + * * @return list of cached elements in descending order */ public CachedBlock[] get() { @@ -96,10 +96,10 @@ public class CachedBlockQueue implements HeapSize { } return blocks.toArray(new CachedBlock[blocks.size()]); } - + /** * Get a sorted List of all elements in this queue, in descending order. - * + * * @return list of cached elements in descending order */ public LinkedList<CachedBlock> getList() { @@ -109,10 +109,10 @@ public class CachedBlockQueue implements HeapSize { } return blocks; } - + /** * Total size of all elements in this queue. - * + * * @return size of all elements currently in queue, in bytes */ public long heapSize() { http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/ClassSize.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/ClassSize.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/ClassSize.java index 8abf425..2d7586f 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/ClassSize.java +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/ClassSize.java @@ -29,76 +29,76 @@ import org.apache.commons.logging.LogFactory; /** * Class for determining the "size" of a class, an attempt to calculate the actual bytes that an object of this class will occupy in memory - * + * * The core of this class is taken from the Derby project */ public class ClassSize { static final Log LOG = LogFactory.getLog(ClassSize.class); - + private static int nrOfRefsPerObj = 2; - + /** Array overhead */ public static int ARRAY = 0; - + /** Overhead for ArrayList(0) */ public static int ARRAYLIST = 0; - + /** Overhead for ByteBuffer */ public static int BYTE_BUFFER = 0; - + /** Overhead for an Integer */ public static int INTEGER = 0; - + /** Overhead for entry in map */ public static int MAP_ENTRY = 0; - + /** Object overhead is minimum 2 * reference size (8 bytes on 64-bit) */ public static int OBJECT = 0; - + /** Reference size is 8 bytes on 64-bit, 4 bytes on 32-bit */ public static int REFERENCE = 0; - + /** String overhead */ public static int STRING = 0; - + /** Overhead for TreeMap */ public static int TREEMAP = 0; - + /** Overhead for ConcurrentHashMap */ public static int CONCURRENT_HASHMAP = 0; - + /** Overhead for ConcurrentHashMap.Entry */ public static int CONCURRENT_HASHMAP_ENTRY = 0; - + /** Overhead for ConcurrentHashMap.Segment */ public static int CONCURRENT_HASHMAP_SEGMENT = 0; - + /** Overhead for ConcurrentSkipListMap */ public static int CONCURRENT_SKIPLISTMAP = 0; - + /** Overhead for ConcurrentSkipListMap Entry */ public static int CONCURRENT_SKIPLISTMAP_ENTRY = 0; - + /** Overhead for ReentrantReadWriteLock */ public static int REENTRANT_LOCK = 0; - + /** Overhead for AtomicLong */ public static int ATOMIC_LONG = 0; - + /** Overhead for AtomicInteger */ public static int ATOMIC_INTEGER = 0; - + /** Overhead for AtomicBoolean */ public static int ATOMIC_BOOLEAN = 0; - + /** Overhead for CopyOnWriteArraySet */ public static int COPYONWRITE_ARRAYSET = 0; - + /** Overhead for CopyOnWriteArrayList */ public static int COPYONWRITE_ARRAYLIST = 0; - + private static final String THIRTY_TWO = "32"; - + /** * Method for reading the arc settings and setting overheads according to 32-bit or 64-bit architecture. */ @@ -106,60 +106,60 @@ public class ClassSize { // Figure out whether this is a 32 or 64 bit machine. Properties sysProps = System.getProperties(); String arcModel = sysProps.getProperty("sun.arch.data.model"); - + // Default value is set to 8, covering the case when arcModel is unknown REFERENCE = 8; if (arcModel.equals(THIRTY_TWO)) { REFERENCE = 4; } - + OBJECT = 2 * REFERENCE; - + ARRAY = 3 * REFERENCE; - + ARRAYLIST = align(OBJECT + align(REFERENCE) + align(ARRAY) + (2 * SizeConstants.SIZEOF_INT)); - + BYTE_BUFFER = align(OBJECT + align(REFERENCE) + align(ARRAY) + (5 * SizeConstants.SIZEOF_INT) + (3 * SizeConstants.SIZEOF_BOOLEAN) + SizeConstants.SIZEOF_LONG); - + INTEGER = align(OBJECT + SizeConstants.SIZEOF_INT); - + MAP_ENTRY = align(OBJECT + 5 * REFERENCE + SizeConstants.SIZEOF_BOOLEAN); - + TREEMAP = align(OBJECT + (2 * SizeConstants.SIZEOF_INT) + align(7 * REFERENCE)); - + STRING = align(OBJECT + ARRAY + REFERENCE + 3 * SizeConstants.SIZEOF_INT); - + CONCURRENT_HASHMAP = align((2 * SizeConstants.SIZEOF_INT) + ARRAY + (6 * REFERENCE) + OBJECT); - + CONCURRENT_HASHMAP_ENTRY = align(REFERENCE + OBJECT + (3 * REFERENCE) + (2 * SizeConstants.SIZEOF_INT)); - + CONCURRENT_HASHMAP_SEGMENT = align(REFERENCE + OBJECT + (3 * SizeConstants.SIZEOF_INT) + SizeConstants.SIZEOF_FLOAT + ARRAY); - + CONCURRENT_SKIPLISTMAP = align(SizeConstants.SIZEOF_INT + OBJECT + (8 * REFERENCE)); - + CONCURRENT_SKIPLISTMAP_ENTRY = align(align(OBJECT + (3 * REFERENCE)) + /* one node per entry */ align((OBJECT + (3 * REFERENCE)) / 2)); /* one index per two entries */ - + REENTRANT_LOCK = align(OBJECT + (3 * REFERENCE)); - + ATOMIC_LONG = align(OBJECT + SizeConstants.SIZEOF_LONG); - + ATOMIC_INTEGER = align(OBJECT + SizeConstants.SIZEOF_INT); - + ATOMIC_BOOLEAN = align(OBJECT + SizeConstants.SIZEOF_BOOLEAN); - + COPYONWRITE_ARRAYSET = align(OBJECT + REFERENCE); - + COPYONWRITE_ARRAYLIST = align(OBJECT + (2 * REFERENCE) + ARRAY); } - + /** * The estimate of the size of a class instance depends on whether the JVM uses 32 or 64 bit addresses, that is it depends on the size of an object reference. * It is a linear function of the size of a reference, e.g. 24 + 5*r where r is the size of a reference (usually 4 or 8 bytes). - * + * * This method returns the coefficients of the linear function, e.g. {24, 5} in the above example. - * + * * @param cl * A class whose instance size is to be estimated * @return an array of 3 integers. The first integer is the size of the primitives, the second the number of arrays and the third the number of references. @@ -169,7 +169,7 @@ public class ClassSize { int arrays = 0; // The number of references that a new object takes int references = nrOfRefsPerObj; - + for (; null != cl; cl = cl.getSuperclass()) { Field[] field = cl.getDeclaredFields(); if (null != field) { @@ -183,7 +183,7 @@ public class ClassSize { references++; } else {// Is simple primitive String name = fieldClass.getName(); - + if (name.equals("int") || name.equals("I")) primitives += SizeConstants.SIZEOF_INT; else if (name.equals("long") || name.equals("J")) @@ -213,18 +213,18 @@ public class ClassSize { } return new int[] {primitives, arrays, references}; } - + /** * Estimate the static space taken up by a class instance given the coefficients returned by getSizeCoefficients. - * + * * @param coeff * the coefficients - * + * * @return the size estimate, in bytes */ private static long estimateBaseFromCoefficients(int[] coeff, boolean debug) { long size = coeff[0] + align(coeff[1] * ARRAY) + coeff[2] * REFERENCE; - + // Round up to a multiple of 8 size = align(size); if (debug) { @@ -236,21 +236,21 @@ public class ClassSize { } return size; } - + /** * Estimate the static space taken up by the fields of a class. This includes the space taken up by by references (the pointer) but not by the referenced * object. So the estimated size of an array field does not depend on the size of the array. Similarly the size of an object (reference) field does not depend * on the object. - * + * * @return the size estimate in bytes. */ public static long estimateBase(Class<?> cl, boolean debug) { return estimateBaseFromCoefficients(getSizeCoefficients(cl, debug), debug); } - + /** * Aligns a number to 8. - * + * * @param num * number to align to 8 * @return smallest number >= input that is a multiple of 8 @@ -258,10 +258,10 @@ public class ClassSize { public static int align(int num) { return (int) (align((long) num)); } - + /** * Aligns a number to 8. - * + * * @param num * number to align to 8 * @return smallest number >= input that is a multiple of 8 @@ -271,5 +271,5 @@ public class ClassSize { // stored and sent together return ((num + 7) >> 3) << 3; } - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/HeapSize.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/HeapSize.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/HeapSize.java index ca2402e..04db3cd 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/HeapSize.java +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/HeapSize.java @@ -27,10 +27,10 @@ package org.apache.accumulo.core.file.blockfile.cache; * An Object's size is determined by the non-static data members in it, as well as the fixed {@link Object} overhead. * <p> * For example: - * + * * <pre> * public class SampleObject implements HeapSize { - * + * * int[] numbers; * int x; * } @@ -41,5 +41,5 @@ public interface HeapSize { * @return Approximate 'exclusive deep size' of implementing object. Includes count of payload and hosting object sizings. */ long heapSize(); - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/LruBlockCache.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/LruBlockCache.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/LruBlockCache.java index 83f363e..6cab164 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/LruBlockCache.java +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/LruBlockCache.java @@ -36,111 +36,111 @@ import org.apache.commons.logging.LogFactory; * A block cache implementation that is memory-aware using {@link HeapSize}, memory-bound using an LRU eviction algorithm, and concurrent: backed by a * {@link ConcurrentHashMap} and with a non-blocking eviction thread giving constant-time {@link #cacheBlock} and {@link #getBlock} operations. * <p> - * + * * Contains three levels of block priority to allow for scan-resistance and in-memory families. A block is added with an inMemory flag if necessary, otherwise a * block becomes a single access priority. Once a blocked is accessed again, it changes to multiple access. This is used to prevent scans from thrashing the * cache, adding a least-frequently-used element to the eviction algorithm. * <p> - * + * * Each priority is given its own chunk of the total cache to ensure fairness during eviction. Each priority will retain close to its maximum size, however, if * any priority is not using its entire chunk the others are able to grow beyond their chunk size. * <p> - * + * * Instantiated at a minimum with the total size and average block size. All sizes are in bytes. The block size is not especially important as this cache is * fully dynamic in its sizing of blocks. It is only used for pre-allocating data structures and in initial heap estimation of the map. * <p> - * + * * The detailed constructor defines the sizes for the three priorities (they should total to the maximum size defined). It also sets the levels that trigger and * control the eviction thread. * <p> - * + * * The acceptable size is the cache size level which triggers the eviction process to start. It evicts enough blocks to get the size below the minimum size * specified. * <p> - * + * * Eviction happens in a separate thread and involves a single full-scan of the map. It determines how many bytes must be freed to reach the minimum size, and * then while scanning determines the fewest least-recently-used blocks necessary from each of the three priorities (would be 3 times bytes to free). It then * uses the priority chunk sizes to evict fairly according to the relative sizes and usage. */ public class LruBlockCache implements BlockCache, HeapSize { - + static final Log LOG = LogFactory.getLog(LruBlockCache.class); - + /** Default Configuration Parameters */ - + /** Backing Concurrent Map Configuration */ static final float DEFAULT_LOAD_FACTOR = 0.75f; static final int DEFAULT_CONCURRENCY_LEVEL = 16; - + /** Eviction thresholds */ static final float DEFAULT_MIN_FACTOR = 0.75f; static final float DEFAULT_ACCEPTABLE_FACTOR = 0.85f; - + /** Priority buckets */ static final float DEFAULT_SINGLE_FACTOR = 0.25f; static final float DEFAULT_MULTI_FACTOR = 0.50f; static final float DEFAULT_MEMORY_FACTOR = 0.25f; - + /** Statistics thread */ static final int statThreadPeriod = 60; - + /** Concurrent map (the cache) */ private final ConcurrentHashMap<String,CachedBlock> map; - + /** Eviction lock (locked when eviction in process) */ private final ReentrantLock evictionLock = new ReentrantLock(true); - + /** Volatile boolean to track if we are in an eviction process or not */ private volatile boolean evictionInProgress = false; - + /** Eviction thread */ private final EvictionThread evictionThread; - + /** Statistics thread schedule pool (for heavy debugging, could remove) */ private final ScheduledExecutorService scheduleThreadPool = Executors.newScheduledThreadPool(1, new NamingThreadFactory("LRUBlockCacheStats")); - + /** Current size of cache */ private final AtomicLong size; - + /** Current number of cached elements */ private final AtomicLong elements; - + /** Cache access count (sequential ID) */ private final AtomicLong count; - + /** Cache statistics */ private final CacheStats stats; - + /** Maximum allowable size of cache (block put if size > max, evict) */ private long maxSize; - + /** Approximate block size */ private long blockSize; - + /** Acceptable size of cache (no evictions if size < acceptable) */ private float acceptableFactor; - + /** Minimum threshold of cache (when evicting, evict until size < min) */ private float minFactor; - + /** Single access bucket size */ private float singleFactor; - + /** Multiple access bucket size */ private float multiFactor; - + /** In-memory bucket size */ private float memoryFactor; - + /** Overhead of the structure itself */ private long overhead; - + /** * Default constructor. Specify maximum size and expected average block size (approximation is fine). - * + * * <p> * All other factors will be calculated based on defaults specified in this class. - * + * * @param maxSize * maximum size of cache, in bytes * @param blockSize @@ -149,7 +149,7 @@ public class LruBlockCache implements BlockCache, HeapSize { public LruBlockCache(long maxSize, long blockSize) { this(maxSize, blockSize, true); } - + /** * Constructor used for testing. Allows disabling of the eviction thread. */ @@ -157,10 +157,10 @@ public class LruBlockCache implements BlockCache, HeapSize { this(maxSize, blockSize, evictionThread, (int) Math.ceil(1.2 * maxSize / blockSize), DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL, DEFAULT_MIN_FACTOR, DEFAULT_ACCEPTABLE_FACTOR, DEFAULT_SINGLE_FACTOR, DEFAULT_MULTI_FACTOR, DEFAULT_MEMORY_FACTOR); } - + /** * Configurable constructor. Use this constructor if not using defaults. - * + * * @param maxSize * maximum size of this cache, in bytes * @param blockSize @@ -208,7 +208,7 @@ public class LruBlockCache implements BlockCache, HeapSize { this.elements = new AtomicLong(0); this.overhead = calculateOverhead(maxSize, blockSize, mapConcurrencyLevel); this.size = new AtomicLong(this.overhead); - + if (evictionThread) { this.evictionThread = new EvictionThread(this); this.evictionThread.start(); @@ -224,22 +224,22 @@ public class LruBlockCache implements BlockCache, HeapSize { } this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), statThreadPeriod, statThreadPeriod, TimeUnit.SECONDS); } - + public void setMaxSize(long maxSize) { this.maxSize = maxSize; if (this.size.get() > acceptableSize() && !evictionInProgress) { runEviction(); } } - + // BlockCache implementation - + /** * Cache the block with the specified name and buffer. * <p> * It is assumed this will NEVER be called on an already cached block. If that is done, it is assumed that you are reinserting the same exact block due to a * race condition and will update the buffer but not modify the size of the cache. - * + * * @param blockName * block name * @param buf @@ -252,7 +252,7 @@ public class LruBlockCache implements BlockCache, HeapSize { if (cb != null) { stats.duplicateReads(); cb.access(count.incrementAndGet()); - + } else { cb = new CachedBlock(blockName, buf, count.incrementAndGet(), inMemory); long newSize = size.addAndGet(cb.heapSize()); @@ -262,16 +262,16 @@ public class LruBlockCache implements BlockCache, HeapSize { runEviction(); } } - + return cb; } - + /** * Cache the block with the specified name and buffer. * <p> * It is assumed this will NEVER be called on an already cached block. If that is done, it is assumed that you are reinserting the same exact block due to a * race condition and will update the buffer but not modify the size of the cache. - * + * * @param blockName * block name * @param buf @@ -280,15 +280,15 @@ public class LruBlockCache implements BlockCache, HeapSize { public CacheEntry cacheBlock(String blockName, byte buf[]) { return cacheBlock(blockName, buf, false); } - + /** * Get the buffer of the block with the specified name. - * + * * @param blockName * block name * @return buffer of specified block name, or null if not in cache */ - + public CachedBlock getBlock(String blockName) { CachedBlock cb = map.get(blockName); if (cb == null) { @@ -299,7 +299,7 @@ public class LruBlockCache implements BlockCache, HeapSize { cb.access(count.incrementAndGet()); return cb; } - + protected long evictBlock(CachedBlock block) { map.remove(block.getName()); size.addAndGet(-1 * block.heapSize()); @@ -307,7 +307,7 @@ public class LruBlockCache implements BlockCache, HeapSize { stats.evicted(); return block.heapSize(); } - + /** * Multi-threaded call to run the eviction process. */ @@ -318,31 +318,31 @@ public class LruBlockCache implements BlockCache, HeapSize { evictionThread.evict(); } } - + /** * Eviction method. */ void evict() { - + // Ensure only one eviction at a time if (!evictionLock.tryLock()) return; - + try { evictionInProgress = true; - + long bytesToFree = size.get() - minSize(); - + LOG.debug("Block cache LRU eviction started. Attempting to free " + bytesToFree + " bytes"); - + if (bytesToFree <= 0) return; - + // Instantiate priority buckets BlockBucket bucketSingle = new BlockBucket(bytesToFree, blockSize, singleSize()); BlockBucket bucketMulti = new BlockBucket(bytesToFree, blockSize, multiSize()); BlockBucket bucketMemory = new BlockBucket(bytesToFree, blockSize, memorySize()); - + // Scan entire map putting into appropriate buckets for (CachedBlock cachedBlock : map.values()) { switch (cachedBlock.getPriority()) { @@ -360,16 +360,16 @@ public class LruBlockCache implements BlockCache, HeapSize { } } } - + PriorityQueue<BlockBucket> bucketQueue = new PriorityQueue<BlockBucket>(3); - + bucketQueue.add(bucketSingle); bucketQueue.add(bucketMulti); bucketQueue.add(bucketMemory); - + int remainingBuckets = 3; long bytesFreed = 0; - + BlockBucket bucket; while ((bucket = bucketQueue.poll()) != null) { long overflow = bucket.overflow(); @@ -379,43 +379,43 @@ public class LruBlockCache implements BlockCache, HeapSize { } remainingBuckets--; } - + float singleMB = ((float) bucketSingle.totalSize()) / ((float) (1024 * 1024)); float multiMB = ((float) bucketMulti.totalSize()) / ((float) (1024 * 1024)); float memoryMB = ((float) bucketMemory.totalSize()) / ((float) (1024 * 1024)); - + LOG.debug("Block cache LRU eviction completed. " + "Freed " + bytesFreed + " bytes. " + "Priority Sizes: " + "Single=" + singleMB + "MB (" + bucketSingle.totalSize() + "), " + "Multi=" + multiMB + "MB (" + bucketMulti.totalSize() + ")," + "Memory=" + memoryMB + "MB (" + bucketMemory.totalSize() + ")"); - + } finally { stats.evict(); evictionInProgress = false; evictionLock.unlock(); } } - + /** * Used to group blocks into priority buckets. There will be a BlockBucket for each priority (single, multi, memory). Once bucketed, the eviction algorithm * takes the appropriate number of elements out of each according to configuration parameters and their relatives sizes. */ private class BlockBucket implements Comparable<BlockBucket> { - + private CachedBlockQueue queue; private long totalSize = 0; private long bucketSize; - + public BlockBucket(long bytesToFree, long blockSize, long bucketSize) { this.bucketSize = bucketSize; queue = new CachedBlockQueue(bytesToFree, blockSize); totalSize = 0; } - + public void add(CachedBlock block) { totalSize += block.heapSize(); queue.add(block); } - + public long free(long toFree) { CachedBlock[] blocks = queue.get(); long freedBytes = 0; @@ -427,99 +427,99 @@ public class LruBlockCache implements BlockCache, HeapSize { } return freedBytes; } - + public long overflow() { return totalSize - bucketSize; } - + public long totalSize() { return totalSize; } - + @Override public int compareTo(BlockBucket that) { if (this.overflow() == that.overflow()) return 0; return this.overflow() > that.overflow() ? 1 : -1; } - + @Override public boolean equals(Object that) { if (that instanceof BlockBucket) - return compareTo((BlockBucket)that) == 0; + return compareTo((BlockBucket) that) == 0; return false; } } - + /** * Get the maximum size of this cache. - * + * * @return max size in bytes */ public long getMaxSize() { return this.maxSize; } - + /** * Get the current size of this cache. - * + * * @return current size in bytes */ public long getCurrentSize() { return this.size.get(); } - + /** * Get the current size of this cache. - * + * * @return current size in bytes */ public long getFreeSize() { return getMaxSize() - getCurrentSize(); } - + /** * Get the size of this cache (number of cached blocks) - * + * * @return number of cached blocks */ public long size() { return this.elements.get(); } - + /** * Get the number of eviction runs that have occurred */ public long getEvictionCount() { return this.stats.getEvictionCount(); } - + /** * Get the number of blocks that have been evicted during the lifetime of this cache. */ public long getEvictedCount() { return this.stats.getEvictedCount(); } - + /* * Eviction thread. Sits in waiting state until an eviction is triggered when the cache size grows above the acceptable level.<p> - * + * * Thread is triggered into action by {@link LruBlockCache#runEviction()} */ private static class EvictionThread extends Thread { private WeakReference<LruBlockCache> cache; private boolean running = false; - + public EvictionThread(LruBlockCache cache) { super("LruBlockCache.EvictionThread"); setDaemon(true); this.cache = new WeakReference<LruBlockCache>(cache); } - + public synchronized boolean running() { return running; } - + @Override public void run() { while (true) { @@ -535,32 +535,32 @@ public class LruBlockCache implements BlockCache, HeapSize { cache.evict(); } } - + public void evict() { synchronized (this) { this.notify(); } } } - + /* * Statistics thread. Periodically prints the cache statistics to the log. */ private static class StatisticsThread extends Thread { LruBlockCache lru; - + public StatisticsThread(LruBlockCache lru) { super("LruBlockCache.StatisticsThread"); setDaemon(true); this.lru = lru; } - + @Override public void run() { lru.logStats(); } } - + public void logStats() { // Log size long totalSize = heapSize(); @@ -574,17 +574,17 @@ public class LruBlockCache implements BlockCache, HeapSize { + "Hit Ratio=" + stats.getHitRatio() * 100 + "%, " + "Miss Ratio=" + stats.getMissRatio() * 100 + "%, " + "Evicted/Run=" + stats.evictedPerEviction() + ", " + "Duplicate Reads=" + stats.getDuplicateReads()); } - + /** * Get counter statistics for this cache. - * + * * <p> * Includes: total accesses, hits, misses, evicted blocks, and runs of the eviction processes. */ public CacheStats getStats() { return this.stats; } - + public static class CacheStats { private final AtomicLong accessCount = new AtomicLong(0); private final AtomicLong hitCount = new AtomicLong(0); @@ -592,101 +592,101 @@ public class LruBlockCache implements BlockCache, HeapSize { private final AtomicLong evictionCount = new AtomicLong(0); private final AtomicLong evictedCount = new AtomicLong(0); private final AtomicLong duplicateReads = new AtomicLong(0); - + public void miss() { missCount.incrementAndGet(); accessCount.incrementAndGet(); } - + public void hit() { hitCount.incrementAndGet(); accessCount.incrementAndGet(); } - + public void evict() { evictionCount.incrementAndGet(); } - + public void duplicateReads() { duplicateReads.incrementAndGet(); } - + public void evicted() { evictedCount.incrementAndGet(); } - + public long getRequestCount() { return accessCount.get(); } - + public long getMissCount() { return missCount.get(); } - + public long getHitCount() { return hitCount.get(); } - + public long getEvictionCount() { return evictionCount.get(); } - + public long getDuplicateReads() { return duplicateReads.get(); } - + public long getEvictedCount() { return evictedCount.get(); } - + public double getHitRatio() { return ((float) getHitCount() / (float) getRequestCount()); } - + public double getMissRatio() { return ((float) getMissCount() / (float) getRequestCount()); } - + public double evictedPerEviction() { return (float) ((float) getEvictedCount() / (float) getEvictionCount()); } } - + public final static long CACHE_FIXED_OVERHEAD = ClassSize.align((3 * SizeConstants.SIZEOF_LONG) + (8 * ClassSize.REFERENCE) + (5 * SizeConstants.SIZEOF_FLOAT) + SizeConstants.SIZEOF_BOOLEAN + ClassSize.OBJECT); - + // HeapSize implementation public long heapSize() { return getCurrentSize(); } - + public static long calculateOverhead(long maxSize, long blockSize, int concurrency) { return CACHE_FIXED_OVERHEAD + ClassSize.CONCURRENT_HASHMAP + ((int) Math.ceil(maxSize * 1.2 / blockSize) * ClassSize.CONCURRENT_HASHMAP_ENTRY) + (concurrency * ClassSize.CONCURRENT_HASHMAP_SEGMENT); } - + // Simple calculators of sizes given factors and maxSize - + private long acceptableSize() { return (long) Math.floor(this.maxSize * this.acceptableFactor); } - + private long minSize() { return (long) Math.floor(this.maxSize * this.minFactor); } - + private long singleSize() { return (long) Math.floor(this.maxSize * this.singleFactor * this.minFactor); } - + private long multiSize() { return (long) Math.floor(this.maxSize * this.multiFactor * this.minFactor); } - + private long memorySize() { return (long) Math.floor(this.maxSize * this.memoryFactor * this.minFactor); } - + public void shutdown() { this.scheduleThreadPool.shutdown(); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/SizeConstants.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/SizeConstants.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/SizeConstants.java index f3e483f..e5004cd 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/SizeConstants.java +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/SizeConstants.java @@ -17,42 +17,42 @@ package org.apache.accumulo.core.file.blockfile.cache; public class SizeConstants { - + public static final int SIZEOF_BOOLEAN = Byte.SIZE / Byte.SIZE; - + /** * Size of byte in bytes */ public static final int SIZEOF_BYTE = SIZEOF_BOOLEAN; - + /** * Size of char in bytes */ public static final int SIZEOF_CHAR = Character.SIZE / Byte.SIZE; - + /** * Size of double in bytes */ public static final int SIZEOF_DOUBLE = Double.SIZE / Byte.SIZE; - + /** * Size of float in bytes */ public static final int SIZEOF_FLOAT = Float.SIZE / Byte.SIZE; - + /** * Size of int in bytes */ public static final int SIZEOF_INT = Integer.SIZE / Byte.SIZE; - + /** * Size of long in bytes */ public static final int SIZEOF_LONG = Long.SIZE / Byte.SIZE; - + /** * Size of short in bytes */ public static final int SIZEOF_SHORT = Short.SIZE / Byte.SIZE; - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java index 095a218..4d65c9f 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java @@ -41,105 +41,105 @@ import org.apache.hadoop.fs.Path; import org.apache.log4j.Logger; /*** - * + * * This is a wrapper class for BCFile that includes a cache for independent caches for datablocks and metadatablocks */ public class CachableBlockFile { - + private CachableBlockFile() {}; - + private static final Logger log = Logger.getLogger(CachableBlockFile.class); - + public static class Writer implements BlockFileWriter { private BCFile.Writer _bc; private BlockWrite _bw; private FSDataOutputStream fsout = null; - + public Writer(FileSystem fs, Path fName, String compressAlgor, Configuration conf, AccumuloConfiguration accumuloConfiguration) throws IOException { this.fsout = fs.create(fName); init(fsout, compressAlgor, conf, accumuloConfiguration); } - + public Writer(FSDataOutputStream fsout, String compressAlgor, Configuration conf, AccumuloConfiguration accumuloConfiguration) throws IOException { this.fsout = fsout; init(fsout, compressAlgor, conf, accumuloConfiguration); } - + private void init(FSDataOutputStream fsout, String compressAlgor, Configuration conf, AccumuloConfiguration accumuloConfiguration) throws IOException { _bc = new BCFile.Writer(fsout, compressAlgor, conf, false, accumuloConfiguration); } - + public ABlockWriter prepareMetaBlock(String name) throws IOException { _bw = new BlockWrite(_bc.prepareMetaBlock(name)); return _bw; } - + public ABlockWriter prepareMetaBlock(String name, String compressionName) throws IOException { _bw = new BlockWrite(_bc.prepareMetaBlock(name, compressionName)); return _bw; } - + public ABlockWriter prepareDataBlock() throws IOException { _bw = new BlockWrite(_bc.prepareDataBlock()); return _bw; } - + public void close() throws IOException { - + _bw.close(); _bc.close(); - + if (this.fsout != null) { this.fsout.close(); } - + } - + } - + public static class BlockWrite extends DataOutputStream implements ABlockWriter { BlockAppender _ba; - + public BlockWrite(BlockAppender ba) { super(ba); this._ba = ba; }; - + @Override public long getCompressedSize() throws IOException { return _ba.getCompressedSize(); } - + @Override public long getRawSize() throws IOException { return _ba.getRawSize(); } - + @Override public void close() throws IOException { - + _ba.close(); } - + @Override public DataOutputStream getStream() throws IOException { - + return this; } - + @Override public long getStartPos() throws IOException { return _ba.getStartPos(); - } - + } + } - + /** - * - * + * + * * Class wraps the BCFile reader. - * + * */ public static class Reader implements BlockFileReader { private BCFile.Reader _bc; @@ -151,83 +151,84 @@ public class CachableBlockFile { private Configuration conf; private boolean closed = false; private AccumuloConfiguration accumuloConfiguration = null; - + private interface BlockLoader { BlockReader get() throws IOException; - + String getInfo(); } - + private class OffsetBlockLoader implements BlockLoader { - + private int blockIndex; - + OffsetBlockLoader(int blockIndex) { this.blockIndex = blockIndex; } - + @Override public BlockReader get() throws IOException { return getBCFile(accumuloConfiguration).getDataBlock(blockIndex); } - + @Override public String getInfo() { return "" + blockIndex; } - + } - + private class RawBlockLoader implements BlockLoader { - + private long offset; private long compressedSize; private long rawSize; - + RawBlockLoader(long offset, long compressedSize, long rawSize) { this.offset = offset; this.compressedSize = compressedSize; this.rawSize = rawSize; } - + @Override public BlockReader get() throws IOException { return getBCFile(accumuloConfiguration).getDataBlock(offset, compressedSize, rawSize); } - + @Override public String getInfo() { return "" + offset + "," + compressedSize + "," + rawSize; } } - + private class MetaBlockLoader implements BlockLoader { - + private String name; private AccumuloConfiguration accumuloConfiguration; - + MetaBlockLoader(String name, AccumuloConfiguration accumuloConfiguration) { this.name = name; this.accumuloConfiguration = accumuloConfiguration; } - + @Override public BlockReader get() throws IOException { return getBCFile(accumuloConfiguration).getMetaBlock(name); } - + @Override public String getInfo() { return name; } } - - public Reader(FileSystem fs, Path dataFile, Configuration conf, BlockCache data, BlockCache index, AccumuloConfiguration accumuloConfiguration) throws IOException { - + + public Reader(FileSystem fs, Path dataFile, Configuration conf, BlockCache data, BlockCache index, AccumuloConfiguration accumuloConfiguration) + throws IOException { + /* * Grab path create input stream grab len create file */ - + fileName = dataFile.toString(); this._dCache = data; this._iCache = index; @@ -235,8 +236,9 @@ public class CachableBlockFile { this.conf = conf; this.accumuloConfiguration = accumuloConfiguration; } - - public Reader(FSDataInputStream fsin, long len, Configuration conf, BlockCache data, BlockCache index, AccumuloConfiguration accumuloConfiguration) throws IOException { + + public Reader(FSDataInputStream fsin, long len, Configuration conf, BlockCache data, BlockCache index, AccumuloConfiguration accumuloConfiguration) + throws IOException { this._dCache = data; this._iCache = index; init(fsin, len, conf, accumuloConfiguration); @@ -246,50 +248,50 @@ public class CachableBlockFile { // this.fin = fsin; init(fsin, len, conf, accumuloConfiguration); } - + private void init(FSDataInputStream fsin, long len, Configuration conf, AccumuloConfiguration accumuloConfiguration) throws IOException { this._bc = new BCFile.Reader(this, fsin, len, conf, accumuloConfiguration); } - + private synchronized BCFile.Reader getBCFile(AccumuloConfiguration accumuloConfiguration) throws IOException { if (closed) throw new IllegalStateException("File " + fileName + " is closed"); - + if (_bc == null) { // lazily open file if needed Path path = new Path(fileName); fin = fs.open(path); init(fin, fs.getFileStatus(path).getLen(), conf, accumuloConfiguration); } - + return _bc; } - + public BlockRead getCachedMetaBlock(String blockName) throws IOException { String _lookup = fileName + "M" + blockName; - + if (_iCache != null) { CacheEntry cacheEntry = _iCache.getBlock(_lookup); - + if (cacheEntry != null) { return new CachedBlockRead(cacheEntry, cacheEntry.getBuffer()); } - + } - + return null; } - + public BlockRead cacheMetaBlock(String blockName, BlockReader _currBlock) throws IOException { String _lookup = fileName + "M" + blockName; return cacheBlock(_lookup, _iCache, _currBlock, blockName); } - + public void cacheMetaBlock(String blockName, byte[] b) { - + if (_iCache == null) return; - + String _lookup = fileName + "M" + blockName; try { _iCache.cacheBlock(_lookup, b); @@ -297,42 +299,42 @@ public class CachableBlockFile { log.warn("Already cached block: " + _lookup, e); } } - + private BlockRead getBlock(String _lookup, BlockCache cache, BlockLoader loader) throws IOException { - + BlockReader _currBlock; - + if (cache != null) { CacheEntry cb = null; cb = cache.getBlock(_lookup); - + if (cb != null) { return new CachedBlockRead(cb, cb.getBuffer()); } - + } /** * grab the currBlock at this point the block is still in the data stream - * + * */ _currBlock = loader.get(); - + /** * If the block is bigger than the cache just return the stream */ return cacheBlock(_lookup, cache, _currBlock, loader.getInfo()); - + } - + private BlockRead cacheBlock(String _lookup, BlockCache cache, BlockReader _currBlock, String block) throws IOException { - + if ((cache == null) || (_currBlock.getRawSize() > cache.getMaxSize())) { return new BlockRead(_currBlock, _currBlock.getRawSize()); } else { - + /** * Try to fully read block for meta data if error try to close file - * + * */ byte b[] = null; try { @@ -344,26 +346,26 @@ public class CachableBlockFile { } finally { _currBlock.close(); } - + CacheEntry ce = null; try { ce = cache.cacheBlock(_lookup, b); } catch (Exception e) { log.warn("Already cached block: " + _lookup, e); } - + if (ce == null) return new BlockRead(new DataInputStream(new ByteArrayInputStream(b)), b.length); else return new CachedBlockRead(ce, ce.getBuffer()); - + } } - + /** * It is intended that once the BlockRead object is returned to the caller, that the caller will read the entire block and then call close on the BlockRead * class. - * + * * NOTE: In the case of multi-read threads: This method can do redundant work where an entry is read from disk and other threads check the cache before it * has been inserted. */ @@ -371,102 +373,102 @@ public class CachableBlockFile { String _lookup = this.fileName + "M" + blockName; return getBlock(_lookup, _iCache, new MetaBlockLoader(blockName, accumuloConfiguration)); } - + @Override public ABlockReader getMetaBlock(long offset, long compressedSize, long rawSize) throws IOException { String _lookup = this.fileName + "R" + offset; return getBlock(_lookup, _iCache, new RawBlockLoader(offset, compressedSize, rawSize)); } - + /** * It is intended that once the BlockRead object is returned to the caller, that the caller will read the entire block and then call close on the BlockRead * class. - * + * * NOTE: In the case of multi-read threads: This method can do redundant work where an entry is read from disk and other threads check the cache before it * has been inserted. */ - + public BlockRead getDataBlock(int blockIndex) throws IOException { String _lookup = this.fileName + "O" + blockIndex; return getBlock(_lookup, _dCache, new OffsetBlockLoader(blockIndex)); - + } - + @Override public ABlockReader getDataBlock(long offset, long compressedSize, long rawSize) throws IOException { String _lookup = this.fileName + "R" + offset; return getBlock(_lookup, _dCache, new RawBlockLoader(offset, compressedSize, rawSize)); } - + public synchronized void close() throws IOException { if (closed) return; - + closed = true; - + if (_bc != null) _bc.close(); - + if (fin != null) { fin.close(); } } - + } - + static class SeekableByteArrayInputStream extends ByteArrayInputStream { - + public SeekableByteArrayInputStream(byte[] buf) { super(buf); } - + public SeekableByteArrayInputStream(byte buf[], int offset, int length) { super(buf, offset, length); throw new UnsupportedOperationException("Seek code assumes offset is zero"); // do not need this constructor, documenting that seek will not work - // unless offset it kept track of + // unless offset it kept track of } - + public void seek(int position) { if (pos < 0 || pos >= buf.length) throw new IllegalArgumentException("pos = " + pos + " buf.lenght = " + buf.length); this.pos = position; } - + public int getPosition() { return this.pos; } - + } public static class CachedBlockRead extends BlockRead { private SeekableByteArrayInputStream seekableInput; private final CacheEntry cb; - + public CachedBlockRead(CacheEntry cb, byte buf[]) { this(new SeekableByteArrayInputStream(buf), buf.length, cb); } - + private CachedBlockRead(SeekableByteArrayInputStream seekableInput, long size, CacheEntry cb) { - super(seekableInput, size); - this.seekableInput = seekableInput; - this.cb = cb; - } + super(seekableInput, size); + this.seekableInput = seekableInput; + this.cb = cb; + } @Override public void seek(int position) { seekableInput.seek(position); } - + @Override public int getPosition() { return seekableInput.getPosition(); } - + @Override public boolean isIndexable() { return true; } - + @Override public <T> T getIndex(Class<T> clazz) { T bi = null; @@ -475,7 +477,7 @@ public class CachableBlockFile { SoftReference<T> softRef = (SoftReference<T>) cb.getIndex(); if (softRef != null) bi = softRef.get(); - + if (bi == null) { try { bi = clazz.newInstance(); @@ -485,33 +487,33 @@ public class CachableBlockFile { cb.setIndex(new SoftReference<T>(bi)); } } - + return bi; } } /** - * + * * Class provides functionality to read one block from the underlying BCFile Since We are caching blocks in the Reader class as bytearrays, this class will * wrap a DataInputStream(ByteArrayStream(cachedBlock)). - * - * + * + * */ public static class BlockRead extends DataInputStream implements ABlockReader { private long size; - + public BlockRead(InputStream in, long size) { super(in); this.size = size; } - + /** * Size is the size of the bytearray that was read form the cache */ public long getRawSize() { return size; } - + /** * It is intended that the caller of this method will close the stream we also only intend that this be called once per BlockRead. This method is provide * for methods up stream that expect to receive a DataInputStream object. @@ -520,26 +522,26 @@ public class CachableBlockFile { public DataInputStream getStream() throws IOException { return this; } - + @Override public boolean isIndexable() { return false; } - + @Override public void seek(int position) { throw new UnsupportedOperationException(); } - + @Override public int getPosition() { throw new UnsupportedOperationException(); } - + @Override public <T> T getIndex(Class<T> clazz) { throw new UnsupportedOperationException(); } - + } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/file/keyfunctor/ColumnFamilyFunctor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/keyfunctor/ColumnFamilyFunctor.java b/core/src/main/java/org/apache/accumulo/core/file/keyfunctor/ColumnFamilyFunctor.java index e200b44..3660291 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/keyfunctor/ColumnFamilyFunctor.java +++ b/core/src/main/java/org/apache/accumulo/core/file/keyfunctor/ColumnFamilyFunctor.java @@ -22,23 +22,23 @@ import org.apache.accumulo.core.data.Range; import org.apache.hadoop.util.bloom.Key; public class ColumnFamilyFunctor implements KeyFunctor { - + public static final PartialKey kDepth = PartialKey.ROW_COLFAM; - + @Override public Key transform(org.apache.accumulo.core.data.Key acuKey) { - + byte keyData[]; - + ByteSequence row = acuKey.getRowData(); ByteSequence cf = acuKey.getColumnFamilyData(); keyData = new byte[row.length() + cf.length()]; System.arraycopy(row.getBackingArray(), row.offset(), keyData, 0, row.length()); System.arraycopy(cf.getBackingArray(), cf.offset(), keyData, row.length(), cf.length()); - + return new Key(keyData, 1.0); } - + @Override public Key transform(Range range) { if (RowFunctor.isRangeInBloomFilter(range, PartialKey.ROW_COLFAM)) { @@ -46,5 +46,5 @@ public class ColumnFamilyFunctor implements KeyFunctor { } return null; } - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/file/keyfunctor/ColumnQualifierFunctor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/keyfunctor/ColumnQualifierFunctor.java b/core/src/main/java/org/apache/accumulo/core/file/keyfunctor/ColumnQualifierFunctor.java index 7456486..d759a8a 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/keyfunctor/ColumnQualifierFunctor.java +++ b/core/src/main/java/org/apache/accumulo/core/file/keyfunctor/ColumnQualifierFunctor.java @@ -22,11 +22,11 @@ import org.apache.accumulo.core.data.Range; import org.apache.hadoop.util.bloom.Key; public class ColumnQualifierFunctor implements KeyFunctor { - + @Override public org.apache.hadoop.util.bloom.Key transform(org.apache.accumulo.core.data.Key acuKey) { byte keyData[]; - + ByteSequence row = acuKey.getRowData(); ByteSequence cf = acuKey.getColumnFamilyData(); ByteSequence cq = acuKey.getColumnQualifierData(); @@ -34,10 +34,10 @@ public class ColumnQualifierFunctor implements KeyFunctor { System.arraycopy(row.getBackingArray(), row.offset(), keyData, 0, row.length()); System.arraycopy(cf.getBackingArray(), cf.offset(), keyData, row.length(), cf.length()); System.arraycopy(cq.getBackingArray(), cq.offset(), keyData, row.length() + cf.length(), cq.length()); - + return new org.apache.hadoop.util.bloom.Key(keyData, 1.0); } - + @Override public Key transform(Range range) { if (RowFunctor.isRangeInBloomFilter(range, PartialKey.ROW_COLFAM_COLQUAL)) { @@ -45,5 +45,5 @@ public class ColumnQualifierFunctor implements KeyFunctor { } return null; } - + }