ACCUMULO-2202 catch all runtime exceptions, not just NPE
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/47ca312c Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/47ca312c Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/47ca312c Branch: refs/heads/master Commit: 47ca312c950042c4b277a98cbb4eed8a37bf9afd Parents: d183132 91be551 Author: Eric Newton <eric.new...@gmail.com> Authored: Thu Jan 16 10:41:31 2014 -0500 Committer: Eric Newton <eric.new...@gmail.com> Committed: Thu Jan 16 10:41:31 2014 -0500 ---------------------------------------------------------------------- .../java/org/apache/accumulo/core/file/BloomFilterLayer.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/47ca312c/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java ---------------------------------------------------------------------- diff --cc core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java index 2636126,0000000..e79da37 mode 100644,000000..100644 --- a/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java +++ b/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java @@@ -1,502 -1,0 +1,501 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.accumulo.core.file; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.accumulo.core.bloomfilter.DynamicBloomFilter; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.ConfigurationCopy; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.ByteSequence; +import org.apache.accumulo.core.data.PartialKey; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.file.keyfunctor.KeyFunctor; +import org.apache.accumulo.core.file.rfile.RFile; +import org.apache.accumulo.core.iterators.IteratorEnvironment; +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.accumulo.core.util.CachedConfiguration; +import org.apache.accumulo.core.util.LoggingRunnable; +import org.apache.accumulo.core.util.NamingThreadFactory; +import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.util.bloom.Key; +import org.apache.hadoop.util.hash.Hash; +import org.apache.log4j.Logger; + +/** + * A class that sits on top of different accumulo file formats and provides bloom filter functionality. + * + */ +public class BloomFilterLayer { + private static final Logger LOG = Logger.getLogger(BloomFilterLayer.class); + public static final String BLOOM_FILE_NAME = "acu_bloom"; + public static final int HASH_COUNT = 5; + + private static ExecutorService loadThreadPool = null; + + private static synchronized ExecutorService getLoadThreadPool(int maxLoadThreads) { + if (loadThreadPool != null) { + return loadThreadPool; + } + + if (maxLoadThreads > 0) { + BlockingQueue<Runnable> q = new LinkedBlockingQueue<Runnable>(); + loadThreadPool = new ThreadPoolExecutor(0, maxLoadThreads, 60, TimeUnit.SECONDS, q, new NamingThreadFactory("bloom-loader")); + } + + return loadThreadPool; + } + + public static class Writer implements FileSKVWriter { + private DynamicBloomFilter bloomFilter; + private int numKeys; + private int vectorSize; + + private FileSKVWriter writer; + private KeyFunctor transformer = null; + private boolean closed = false; + + Writer(FileSKVWriter writer, AccumuloConfiguration acuconf) { + this.writer = writer; + initBloomFilter(acuconf); + } + + private synchronized void initBloomFilter(AccumuloConfiguration acuconf) { + + numKeys = acuconf.getCount(Property.TABLE_BLOOM_SIZE); + // vector size should be <code>-kn / (ln(1 - c^(1/k)))</code> bits for + // single key, where <code> is the number of hash functions, + // <code>n</code> is the number of keys and <code>c</code> is the desired + // max. error rate. + // Our desired error rate is by default 0.005, i.e. 0.5% + double errorRate = acuconf.getFraction(Property.TABLE_BLOOM_ERRORRATE); + vectorSize = (int) Math.ceil(-HASH_COUNT * numKeys / Math.log(1.0 - Math.pow(errorRate, 1.0 / HASH_COUNT))); + bloomFilter = new DynamicBloomFilter(vectorSize, HASH_COUNT, Hash.parseHashType(acuconf.get(Property.TABLE_BLOOM_HASHTYPE)), numKeys); + + /** + * load KeyFunctor + */ + try { + Class<? extends KeyFunctor> clazz = AccumuloVFSClassLoader.loadClass(acuconf.get(Property.TABLE_BLOOM_KEY_FUNCTOR), KeyFunctor.class); + transformer = clazz.newInstance(); + + } catch (Exception e) { + LOG.error("Failed to find KeyFunctor: " + acuconf.get(Property.TABLE_BLOOM_KEY_FUNCTOR), e); + throw new IllegalArgumentException("Failed to find KeyFunctor: " + acuconf.get(Property.TABLE_BLOOM_KEY_FUNCTOR)); + + } + + } + + @Override + public synchronized void append(org.apache.accumulo.core.data.Key key, Value val) throws IOException { + writer.append(key, val); + Key bloomKey = transformer.transform(key); + if (bloomKey.getBytes().length > 0) + bloomFilter.add(bloomKey); + } + + @Override + public synchronized void close() throws IOException { + + if (closed) + return; + + DataOutputStream out = writer.createMetaStore(BLOOM_FILE_NAME); + out.writeUTF(transformer.getClass().getCanonicalName()); + bloomFilter.write(out); + out.flush(); + out.close(); + writer.close(); + closed = true; + } + + @Override + public DataOutputStream createMetaStore(String name) throws IOException { + return writer.createMetaStore(name); + } + + @Override + public void startDefaultLocalityGroup() throws IOException { + writer.startDefaultLocalityGroup(); + + } + + @Override + public void startNewLocalityGroup(String name, Set<ByteSequence> columnFamilies) throws IOException { + writer.startNewLocalityGroup(name, columnFamilies); + } + + @Override + public boolean supportsLocalityGroups() { + return writer.supportsLocalityGroups(); + } + } + + static class BloomFilterLoader { + + private volatile DynamicBloomFilter bloomFilter; + private int loadRequest = 0; + private int loadThreshold = 1; + private int maxLoadThreads; + private Runnable loadTask; + private volatile KeyFunctor transformer = null; + private volatile boolean closed = false; + + BloomFilterLoader(final FileSKVIterator reader, AccumuloConfiguration acuconf) { + + maxLoadThreads = acuconf.getCount(Property.TSERV_BLOOM_LOAD_MAXCONCURRENT); + + loadThreshold = acuconf.getCount(Property.TABLE_BLOOM_LOAD_THRESHOLD); + + loadTask = new Runnable() { + public void run() { + + // no need to load the bloom filter if the map file is closed + if (closed) + return; + String ClassName = null; + DataInputStream in = null; + + try { + in = reader.getMetaStore(BLOOM_FILE_NAME); + DynamicBloomFilter tmpBloomFilter = new DynamicBloomFilter(); + + // check for closed again after open but before reading the bloom filter in + if (closed) + return; + + /** + * Load classname for keyFunctor + */ + ClassName = in.readUTF(); + + Class<? extends KeyFunctor> clazz = AccumuloVFSClassLoader.loadClass(ClassName, KeyFunctor.class); + transformer = clazz.newInstance(); + + /** + * read in bloom filter + */ + + tmpBloomFilter.readFields(in); + // only set the bloom filter after it is fully constructed + bloomFilter = tmpBloomFilter; + } catch (NoSuchMetaStoreException nsme) { + // file does not have a bloom filter, ignore it + } catch (IOException ioe) { + if (!closed) + LOG.warn("Can't open BloomFilter", ioe); + else + LOG.debug("Can't open BloomFilter, file closed : " + ioe.getMessage()); + + bloomFilter = null; + } catch (ClassNotFoundException e) { + LOG.error("Failed to find KeyFunctor in config: " + ClassName, e); + bloomFilter = null; + } catch (InstantiationException e) { + LOG.error("Could not instantiate KeyFunctor: " + ClassName, e); + bloomFilter = null; + } catch (IllegalAccessException e) { + LOG.error("Illegal acess exception", e); + bloomFilter = null; - } catch (NullPointerException npe) { ++ } catch (RuntimeException rte) { + if (!closed) - throw npe; ++ throw rte; + else - LOG.debug("Can't open BloomFilter, NPE after closed ", npe); - ++ LOG.debug("Can't open BloomFilter, RTE after closed ", rte); + } finally { + if (in != null) { + try { + in.close(); + } catch (IOException e) { + LOG.warn("Failed to close ", e); + } + } + } + } + }; + + initiateLoad(maxLoadThreads); + + } + + private synchronized void initiateLoad(int maxLoadThreads) { + // ensure only one thread initiates loading of bloom filter by + // only taking action when loadTask != null + if (loadTask != null && loadRequest >= loadThreshold) { + try { + ExecutorService ltp = getLoadThreadPool(maxLoadThreads); + if (ltp == null) { + // load the bloom filter in the foreground + loadTask.run(); + } else { + // load the bloom filter in the background + ltp.execute(new LoggingRunnable(LOG, loadTask)); + } + } finally { + // set load task to null so no one else can initiate the load + loadTask = null; + } + } + + loadRequest++; + } + + /** + * Checks if this {@link RFile} contains keys from this range. The membership test is performed using a Bloom filter, so the result has always non-zero probability of + * false positives. + * + * @param range + * range of keys to check + * @return false iff key doesn't exist, true if key probably exists. + * @throws IOException + */ + boolean probablyHasKey(Range range) throws IOException { + if (bloomFilter == null) { + initiateLoad(maxLoadThreads); + if (bloomFilter == null) + return true; + } + + Key bloomKey = transformer.transform(range); + + if (bloomKey == null || bloomKey.getBytes().length == 0) + return true; + + return bloomFilter.membershipTest(bloomKey); + } + + public void close() { + this.closed = true; + } + } + + public static class Reader implements FileSKVIterator { + + private BloomFilterLoader bfl; + private FileSKVIterator reader; + + public Reader(FileSKVIterator reader, AccumuloConfiguration acuconf) { + this.reader = reader; + bfl = new BloomFilterLoader(reader, acuconf); + } + + private Reader(FileSKVIterator src, BloomFilterLoader bfl) { + this.reader = src; + this.bfl = bfl; + } + + private boolean checkSuper = true; + + @Override + public boolean hasTop() { + return checkSuper ? reader.hasTop() : false; + } + + @Override + public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException { + + if (!bfl.probablyHasKey(range)) { + checkSuper = false; + } else { + reader.seek(range, columnFamilies, inclusive); + checkSuper = true; + } + } + + public synchronized void close() throws IOException { + bfl.close(); + reader.close(); + } + + @Override + public org.apache.accumulo.core.data.Key getFirstKey() throws IOException { + return reader.getFirstKey(); + } + + @Override + public org.apache.accumulo.core.data.Key getLastKey() throws IOException { + return reader.getLastKey(); + } + + @Override + public SortedKeyValueIterator<org.apache.accumulo.core.data.Key,Value> deepCopy(IteratorEnvironment env) { + return new BloomFilterLayer.Reader((FileSKVIterator) reader.deepCopy(env), bfl); + } + + @Override + public org.apache.accumulo.core.data.Key getTopKey() { + return reader.getTopKey(); + } + + @Override + public Value getTopValue() { + return reader.getTopValue(); + } + + @Override + public void init(SortedKeyValueIterator<org.apache.accumulo.core.data.Key,Value> source, Map<String,String> options, IteratorEnvironment env) + throws IOException { + throw new UnsupportedOperationException(); + + } + + @Override + public void next() throws IOException { + reader.next(); + } + + @Override + public DataInputStream getMetaStore(String name) throws IOException { + return reader.getMetaStore(name); + } + + @Override + public void closeDeepCopies() throws IOException { + reader.closeDeepCopies(); + } + + @Override + public void setInterruptFlag(AtomicBoolean flag) { + reader.setInterruptFlag(flag); + } + + } + + public static void main(String[] args) throws IOException { + PrintStream out = System.out; + + Random r = new Random(); + + HashSet<Integer> valsSet = new HashSet<Integer>(); + + for (int i = 0; i < 100000; i++) { + valsSet.add(r.nextInt(Integer.MAX_VALUE)); + } + + ArrayList<Integer> vals = new ArrayList<Integer>(valsSet); + Collections.sort(vals); + + ConfigurationCopy acuconf = new ConfigurationCopy(AccumuloConfiguration.getDefaultConfiguration()); + acuconf.set(Property.TABLE_BLOOM_ENABLED, "true"); + acuconf.set(Property.TABLE_BLOOM_KEY_FUNCTOR, "accumulo.core.file.keyfunctor.ColumnFamilyFunctor"); + acuconf.set(Property.TABLE_FILE_TYPE, RFile.EXTENSION); + acuconf.set(Property.TABLE_BLOOM_LOAD_THRESHOLD, "1"); + acuconf.set(Property.TSERV_BLOOM_LOAD_MAXCONCURRENT, "1"); + + Configuration conf = CachedConfiguration.getInstance(); + FileSystem fs = FileSystem.get(conf); + + String suffix = FileOperations.getNewFileExtension(acuconf); + String fname = "/tmp/test." + suffix; + FileSKVWriter bmfw = FileOperations.getInstance().openWriter(fname, fs, conf, acuconf); + + long t1 = System.currentTimeMillis(); + + bmfw.startDefaultLocalityGroup(); + + for (Integer i : vals) { + String fi = String.format("%010d", i); + bmfw.append(new org.apache.accumulo.core.data.Key(new Text("r" + fi), new Text("cf1")), new Value(("v" + fi).getBytes())); + bmfw.append(new org.apache.accumulo.core.data.Key(new Text("r" + fi), new Text("cf2")), new Value(("v" + fi).getBytes())); + } + + long t2 = System.currentTimeMillis(); + + out.printf("write rate %6.2f%n", vals.size() / ((t2 - t1) / 1000.0)); + + bmfw.close(); + + t1 = System.currentTimeMillis(); + FileSKVIterator bmfr = FileOperations.getInstance().openReader(fname, false, fs, conf, acuconf); + t2 = System.currentTimeMillis(); + out.println("Opened " + fname + " in " + (t2 - t1)); + + t1 = System.currentTimeMillis(); + + int hits = 0; + for (int i = 0; i < 5000; i++) { + int row = r.nextInt(Integer.MAX_VALUE); + String fi = String.format("%010d", row); + // bmfr.seek(new Range(new Text("r"+fi))); + org.apache.accumulo.core.data.Key k1 = new org.apache.accumulo.core.data.Key(new Text("r" + fi), new Text("cf1")); + bmfr.seek(new Range(k1, true, k1.followingKey(PartialKey.ROW_COLFAM), false), new ArrayList<ByteSequence>(), false); + if (valsSet.contains(row)) { + hits++; + if (!bmfr.hasTop()) { + out.println("ERROR " + row); + } + } + } + + t2 = System.currentTimeMillis(); + + out.printf("random lookup rate : %6.2f%n", 5000 / ((t2 - t1) / 1000.0)); + out.println("hits = " + hits); + + int count = 0; + + t1 = System.currentTimeMillis(); + + for (Integer row : valsSet) { + String fi = String.format("%010d", row); + // bmfr.seek(new Range(new Text("r"+fi))); + + org.apache.accumulo.core.data.Key k1 = new org.apache.accumulo.core.data.Key(new Text("r" + fi), new Text("cf1")); + bmfr.seek(new Range(k1, true, k1.followingKey(PartialKey.ROW_COLFAM), false), new ArrayList<ByteSequence>(), false); + + if (!bmfr.hasTop()) { + out.println("ERROR 2 " + row); + } + + count++; + + if (count >= 500) { + break; + } + } + + t2 = System.currentTimeMillis(); + + out.printf("existant lookup rate %6.2f%n", 500 / ((t2 - t1) / 1000.0)); + out.println("expected hits 500. Receive hits: " + count); + bmfr.close(); + } +}