Repository: accumulo Updated Branches: refs/heads/master d37f05f65 -> d6b1e9a95
ACCUMULO-2836: Added context classpath support to BloomFilter, AggregatingIterator, and TableLoadBalancer Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/c9c3fdd4 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/c9c3fdd4 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/c9c3fdd4 Branch: refs/heads/master Commit: c9c3fdd4f5fc648ccc5e3a905236a82e88524251 Parents: eb6b325 Author: Dave Marion <dlmar...@hotmail.com> Authored: Fri May 23 22:31:23 2014 -0400 Committer: Dave Marion <dlmar...@hotmail.com> Committed: Fri May 23 22:31:23 2014 -0400 ---------------------------------------------------------------------- .../accumulo/core/file/BloomFilterLayer.java | 19 ++++++++++++++++--- .../core/iterators/AggregatingIterator.java | 6 +++++- .../aggregation/conf/AggregatorSet.java | 3 ++- .../iterators/conf/ColumnToClassMapping.java | 16 +++++++++++++--- .../master/balancer/TableLoadBalancer.java | 9 ++++++++- .../java/org/apache/accumulo/master/Master.java | 14 ++++++++++++++ 6 files changed, 58 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/c9c3fdd4/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java b/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java index 5829ce6..d0e736c 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java +++ b/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java @@ -114,7 +114,14 @@ public class BloomFilterLayer { * load KeyFunctor */ try { - Class<? extends KeyFunctor> clazz = AccumuloVFSClassLoader.loadClass(acuconf.get(Property.TABLE_BLOOM_KEY_FUNCTOR), KeyFunctor.class); + String context = acuconf.get(Property.TABLE_CLASSPATH); + String classname = acuconf.get(Property.TABLE_BLOOM_KEY_FUNCTOR); + Class<? extends KeyFunctor> clazz; + if (context != null && !context.equals("")) + clazz = AccumuloVFSClassLoader.getContextManager().loadClass(context, classname, KeyFunctor.class); + else + clazz = AccumuloVFSClassLoader.loadClass(classname, KeyFunctor.class); + transformer = clazz.newInstance(); } catch (Exception e) { @@ -186,6 +193,8 @@ public class BloomFilterLayer { loadThreshold = acuconf.getCount(Property.TABLE_BLOOM_LOAD_THRESHOLD); + final String context = acuconf.get(Property.TABLE_CLASSPATH); + loadTask = new Runnable() { @Override public void run() { @@ -208,8 +217,12 @@ public class BloomFilterLayer { * Load classname for keyFunctor */ ClassName = in.readUTF(); - - Class<? extends KeyFunctor> clazz = AccumuloVFSClassLoader.loadClass(ClassName, KeyFunctor.class); + + Class<? extends KeyFunctor> clazz; + if (context != null && !context.equals("")) + clazz = AccumuloVFSClassLoader.getContextManager().loadClass(context, ClassName, KeyFunctor.class); + else + clazz = AccumuloVFSClassLoader.loadClass(ClassName, KeyFunctor.class); transformer = clazz.newInstance(); /** http://git-wip-us.apache.org/repos/asf/accumulo/blob/c9c3fdd4/core/src/main/java/org/apache/accumulo/core/iterators/AggregatingIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/AggregatingIterator.java b/core/src/main/java/org/apache/accumulo/core/iterators/AggregatingIterator.java index c5c034e..9b89b47 100644 --- a/core/src/main/java/org/apache/accumulo/core/iterators/AggregatingIterator.java +++ b/core/src/main/java/org/apache/accumulo/core/iterators/AggregatingIterator.java @@ -22,6 +22,7 @@ import java.util.Collections; import java.util.Map; import java.util.Map.Entry; +import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.ByteSequence; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.PartialKey; @@ -167,7 +168,10 @@ public class AggregatingIterator implements SortedKeyValueIterator<Key,Value>, O this.iterator = source; try { - this.aggregators = new ColumnToClassMapping<Aggregator>(options, Aggregator.class); + String context = null; + if (null != env) + context = env.getConfig().get(Property.TABLE_CLASSPATH); + this.aggregators = new ColumnToClassMapping<Aggregator>(options, Aggregator.class, context); } catch (ClassNotFoundException e) { log.error(e.toString()); throw new IllegalArgumentException(e); http://git-wip-us.apache.org/repos/asf/accumulo/blob/c9c3fdd4/core/src/main/java/org/apache/accumulo/core/iterators/aggregation/conf/AggregatorSet.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/aggregation/conf/AggregatorSet.java b/core/src/main/java/org/apache/accumulo/core/iterators/aggregation/conf/AggregatorSet.java index afa7587..ad33fa2 100644 --- a/core/src/main/java/org/apache/accumulo/core/iterators/aggregation/conf/AggregatorSet.java +++ b/core/src/main/java/org/apache/accumulo/core/iterators/aggregation/conf/AggregatorSet.java @@ -16,6 +16,7 @@ */ package org.apache.accumulo.core.iterators.aggregation.conf; +import java.io.IOException; import java.util.Map; import org.apache.accumulo.core.data.Key; @@ -27,7 +28,7 @@ import org.apache.accumulo.core.iterators.conf.ColumnToClassMapping; */ @Deprecated public class AggregatorSet extends ColumnToClassMapping<Aggregator> { - public AggregatorSet(Map<String,String> opts) throws InstantiationException, IllegalAccessException, ClassNotFoundException { + public AggregatorSet(Map<String,String> opts) throws InstantiationException, IllegalAccessException, ClassNotFoundException, IOException { super(opts, Aggregator.class); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/c9c3fdd4/core/src/main/java/org/apache/accumulo/core/iterators/conf/ColumnToClassMapping.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/conf/ColumnToClassMapping.java b/core/src/main/java/org/apache/accumulo/core/iterators/conf/ColumnToClassMapping.java index c835b9d..97f242b 100644 --- a/core/src/main/java/org/apache/accumulo/core/iterators/conf/ColumnToClassMapping.java +++ b/core/src/main/java/org/apache/accumulo/core/iterators/conf/ColumnToClassMapping.java @@ -16,6 +16,7 @@ */ package org.apache.accumulo.core.iterators.conf; +import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; @@ -41,8 +42,13 @@ public class ColumnToClassMapping<K> { } public ColumnToClassMapping(Map<String,String> objectStrings, Class<? extends K> c) throws InstantiationException, IllegalAccessException, - ClassNotFoundException { - this(); + ClassNotFoundException, IOException { + this(objectStrings, c, null); + } + + public ColumnToClassMapping(Map<String,String> objectStrings, Class<? extends K> c, String context) throws InstantiationException, IllegalAccessException, + ClassNotFoundException, IOException { + this(); for (Entry<String,String> entry : objectStrings.entrySet()) { String column = entry.getKey(); @@ -50,7 +56,11 @@ public class ColumnToClassMapping<K> { Pair<Text,Text> pcic = ColumnSet.decodeColumns(column); - Class<? extends K> clazz = AccumuloVFSClassLoader.loadClass(className, c); + Class<? extends K> clazz; + if (context != null && !context.equals("")) + clazz = (Class<? extends K>) AccumuloVFSClassLoader.getContextManager().getClassLoader(context).loadClass(className); + else + clazz = AccumuloVFSClassLoader.loadClass(className, c); if (pcic.getSecond() == null) { addObject(pcic.getFirst(), clazz.newInstance()); http://git-wip-us.apache.org/repos/asf/accumulo/blob/c9c3fdd4/server/base/src/main/java/org/apache/accumulo/server/master/balancer/TableLoadBalancer.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/balancer/TableLoadBalancer.java b/server/base/src/main/java/org/apache/accumulo/server/master/balancer/TableLoadBalancer.java index f2478b1..d96f9b0 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/master/balancer/TableLoadBalancer.java +++ b/server/base/src/main/java/org/apache/accumulo/server/master/balancer/TableLoadBalancer.java @@ -46,7 +46,14 @@ public class TableLoadBalancer extends TabletBalancer { Map<String,TabletBalancer> perTableBalancers = new HashMap<String,TabletBalancer>(); private TabletBalancer constructNewBalancerForTable(String clazzName, String table) throws Exception { - Class<? extends TabletBalancer> clazz = AccumuloVFSClassLoader.loadClass(clazzName, TabletBalancer.class); + String context = null; + if (null != configuration) + context = configuration.getTableConfiguration(table).get(Property.TABLE_CLASSPATH); + Class<? extends TabletBalancer> clazz; + if (context != null && !context.equals("")) + clazz = AccumuloVFSClassLoader.getContextManager().loadClass(context, clazzName, TabletBalancer.class); + else + clazz = AccumuloVFSClassLoader.loadClass(clazzName, TabletBalancer.class); Constructor<? extends TabletBalancer> constructor = clazz.getConstructor(String.class); return constructor.newInstance(table); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/c9c3fdd4/server/master/src/main/java/org/apache/accumulo/master/Master.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/Master.java b/server/master/src/main/java/org/apache/accumulo/master/Master.java index 2440ee4..797e066 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/Master.java +++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java @@ -115,6 +115,8 @@ import org.apache.accumulo.server.util.TServerUtils.ServerAddress; import org.apache.accumulo.server.util.time.SimpleTimer; import org.apache.accumulo.server.zookeeper.ZooLock; import org.apache.accumulo.server.zookeeper.ZooReaderWriter; +import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader; +import org.apache.accumulo.start.classloader.vfs.ContextManager; import org.apache.accumulo.trace.instrument.thrift.TraceWrap; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataInputBuffer; @@ -477,6 +479,18 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt tserverSet = new LiveTServerSet(instance, config.getConfiguration(), this); this.tabletBalancer = aconf.instantiateClassProperty(Property.MASTER_TABLET_BALANCER, TabletBalancer.class, new DefaultLoadBalancer()); this.tabletBalancer.init(serverConfig); + + try { + AccumuloVFSClassLoader.getContextManager().setContextConfig(new ContextManager.DefaultContextsConfig(new Iterable<Entry<String,String>>() { + @Override + public Iterator<Entry<String,String>> iterator() { + return getSystemConfiguration().iterator(); + } + })); + } catch (IOException e) { + throw new RuntimeException(e); + } + } public TServerConnection getConnection(TServerInstance server) {