ACCUMULO-3183 support loading compaction strategy from per table classpath
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/43f787d6 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/43f787d6 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/43f787d6 Branch: refs/heads/master Commit: 43f787d68012badd4c980be2d36195a8ce56bf40 Parents: a65f123 Author: Keith Turner <ktur...@apache.org> Authored: Fri Oct 3 14:28:33 2014 -0400 Committer: Keith Turner <ktur...@apache.org> Committed: Fri Oct 3 14:28:33 2014 -0400 ---------------------------------------------------------------------- .../org/apache/accumulo/core/conf/Property.java | 25 ++++++++++++-- .../org/apache/accumulo/tserver/Tablet.java | 2 +- .../tserver/TabletServerResourceManager.java | 2 +- .../functional/ConfigurableCompactionIT.java | 34 +++++++++++++++++++ test/src/test/resources/TestCompactionStrat.jar | Bin 0 -> 1681 bytes 5 files changed, 58 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/43f787d6/core/src/main/java/org/apache/accumulo/core/conf/Property.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index 6167b25..ad83454 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -629,12 +629,18 @@ public enum Property { // This is not a cache for loaded classes, just a way to avoid spamming the debug log static Map<String,Class<?>> loaded = Collections.synchronizedMap(new HashMap<String,Class<?>>()); - public static <T> T createInstanceFromPropertyName(AccumuloConfiguration conf, Property property, Class<T> base, T defaultInstance) { - String clazzName = conf.get(property); + private static <T> T createInstance(String context, String clazzName, Class<T> base, T defaultInstance) { T instance = null; try { - Class<? extends T> clazz = AccumuloVFSClassLoader.loadClass(clazzName, base); + + Class<? extends T> clazz; + if (context != null && !context.equals("")) { + clazz = AccumuloVFSClassLoader.getContextManager().loadClass(context, clazzName, base); + } else { + clazz = AccumuloVFSClassLoader.loadClass(clazzName, base); + } + instance = clazz.newInstance(); if (loaded.put(clazzName, clazz) != clazz) log.debug("Loaded class : " + clazzName); @@ -649,6 +655,19 @@ public enum Property { return instance; } + public static <T> T createTableInstanceFromPropertyName(AccumuloConfiguration conf, Property property, Class<T> base, T defaultInstance) { + String clazzName = conf.get(property); + String context = conf.get(TABLE_CLASSPATH); + + return createInstance(context, clazzName, base, defaultInstance); + } + + public static <T> T createInstanceFromPropertyName(AccumuloConfiguration conf, Property property, Class<T> base, T defaultInstance) { + String clazzName = conf.get(property); + + return createInstance(null, clazzName, base, defaultInstance); + } + public static Map<String,String> getCompactionStrategyOptions(AccumuloConfiguration tableConf) { Map<String,String> longNames = tableConf.getAllPropertiesWithPrefix(Property.TABLE_COMPACTION_STRATEGY_PREFIX); Map<String,String> result = new HashMap<String,String>(); http://git-wip-us.apache.org/repos/asf/accumulo/blob/43f787d6/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java index 4e7a77e..226f3d8 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java @@ -3073,7 +3073,7 @@ public class Tablet { long t1, t2, t3; // acquire file info outside of tablet lock - CompactionStrategy strategy = Property.createInstanceFromPropertyName(acuTableConf, Property.TABLE_COMPACTION_STRATEGY, CompactionStrategy.class, + CompactionStrategy strategy = Property.createTableInstanceFromPropertyName(acuTableConf, Property.TABLE_COMPACTION_STRATEGY, CompactionStrategy.class, new DefaultCompactionStrategy()); strategy.init(Property.getCompactionStrategyOptions(acuTableConf)); http://git-wip-us.apache.org/repos/asf/accumulo/blob/43f787d6/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java index d4bc0fd..935ffeb 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java @@ -565,7 +565,7 @@ public class TabletServerResourceManager { return false; } } - CompactionStrategy strategy = Property.createInstanceFromPropertyName(tableConf, Property.TABLE_COMPACTION_STRATEGY, CompactionStrategy.class, + CompactionStrategy strategy = Property.createTableInstanceFromPropertyName(tableConf, Property.TABLE_COMPACTION_STRATEGY, CompactionStrategy.class, new DefaultCompactionStrategy()); strategy.init(Property.getCompactionStrategyOptions(tableConf)); MajorCompactionRequest request = new MajorCompactionRequest(tablet.getExtent(), reason, TabletServerResourceManager.this.fs, tableConf); http://git-wip-us.apache.org/repos/asf/accumulo/blob/43f787d6/test/src/test/java/org/apache/accumulo/test/functional/ConfigurableCompactionIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/functional/ConfigurableCompactionIT.java b/test/src/test/java/org/apache/accumulo/test/functional/ConfigurableCompactionIT.java index 353fdb1..2b90c81 100644 --- a/test/src/test/java/org/apache/accumulo/test/functional/ConfigurableCompactionIT.java +++ b/test/src/test/java/org/apache/accumulo/test/functional/ConfigurableCompactionIT.java @@ -19,9 +19,11 @@ package org.apache.accumulo.test.functional; import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.util.Arrays; import java.util.Collections; import java.util.Map; import java.util.Random; +import java.util.TreeSet; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.BatchWriterConfig; @@ -38,6 +40,7 @@ import org.apache.accumulo.tserver.compaction.CompactionPlan; import org.apache.accumulo.tserver.compaction.CompactionStrategy; import org.apache.accumulo.tserver.compaction.MajorCompactionRequest; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; import org.junit.Assert; import org.junit.Test; @@ -85,6 +88,37 @@ public class ConfigurableCompactionIT extends ConfigurableMacIT { runTest(c, tableName, 5); } + @Test(timeout = 60000) + public void testPerTableClasspath() throws Exception { + final Connector c = getConnector(); + final String tableName = getUniqueNames(1)[0]; + c.tableOperations().create(tableName); + c.instanceOperations().setProperty(Property.VFS_CONTEXT_CLASSPATH_PROPERTY.getKey() + "context1", + System.getProperty("user.dir") + "/src/test/resources/TestCompactionStrat.jar"); + c.tableOperations().setProperty(tableName, Property.TABLE_MAJC_RATIO.getKey(), "10"); + c.tableOperations().setProperty(tableName, Property.TABLE_CLASSPATH.getKey(), "context1"); + // EfgCompactionStrat will only compact a tablet w/ end row of 'efg'. No other tablets are compacted. + c.tableOperations().setProperty(tableName, Property.TABLE_COMPACTION_STRATEGY.getKey(), "org.apache.accumulo.test.EfgCompactionStrat"); + + c.tableOperations().addSplits(tableName, new TreeSet<Text>(Arrays.asList(new Text("efg")))); + + for (char ch = 'a'; ch < 'l'; ch++) + writeFlush(c, tableName, ch + ""); + + while (countFiles(c, tableName) != 7) { + UtilWaitThread.sleep(200); + } + } + + private void writeFlush(Connector conn, String tablename, String row) throws Exception { + BatchWriter bw = conn.createBatchWriter(tablename, new BatchWriterConfig()); + Mutation m = new Mutation(row); + m.put("", "", ""); + bw.addMutation(m); + bw.close(); + conn.tableOperations().flush(tablename, null, null, true); + } + final static Random r = new Random(); private void makeFile(Connector conn, String tablename) throws Exception { http://git-wip-us.apache.org/repos/asf/accumulo/blob/43f787d6/test/src/test/resources/TestCompactionStrat.jar ---------------------------------------------------------------------- diff --git a/test/src/test/resources/TestCompactionStrat.jar b/test/src/test/resources/TestCompactionStrat.jar new file mode 100644 index 0000000..0bc38d4 Binary files /dev/null and b/test/src/test/resources/TestCompactionStrat.jar differ