ACCUMULO-4065 Tweak generics in RpcWrapper and related code * depend more on compile time checks with generic checking * use better naming of generic params (I for Iface, P for Processor) to clarify relationship between types * remove unneeded casts and warnings suppressions
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/bd8cf5e2 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/bd8cf5e2 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/bd8cf5e2 Branch: refs/heads/master Commit: bd8cf5e21305b63bd4bd62c92f7ed045eb440830 Parents: 5724df5 Author: Christopher Tubbs <ctubb...@apache.org> Authored: Wed Dec 2 20:04:04 2015 -0500 Committer: Christopher Tubbs <ctubb...@apache.org> Committed: Wed Dec 2 20:04:04 2015 -0500 ---------------------------------------------------------------------- .../java/org/apache/accumulo/proxy/Proxy.java | 22 ++++++++++--------- .../apache/accumulo/server/util/RpcWrapper.java | 11 +++++----- .../accumulo/server/util/RpcWrapperTest.java | 23 +++++++++++--------- .../accumulo/gc/SimpleGarbageCollector.java | 2 +- .../java/org/apache/accumulo/master/Master.java | 4 ++-- .../apache/accumulo/tserver/TabletServer.java | 2 +- 6 files changed, 35 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/bd8cf5e2/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java ---------------------------------------------------------------------- diff --git a/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java b/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java index 3368d20..53221e0 100644 --- a/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java +++ b/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java @@ -30,7 +30,6 @@ import org.apache.accumulo.proxy.thrift.AccumuloProxy; import org.apache.accumulo.server.util.RpcWrapper; import org.apache.log4j.Logger; import org.apache.thrift.TBaseProcessor; -import org.apache.thrift.TProcessor; import org.apache.thrift.protocol.TCompactProtocol; import org.apache.thrift.protocol.TProtocolFactory; import org.apache.thrift.server.THsHaServer; @@ -125,20 +124,23 @@ public class Proxy { public static TServer createProxyServer(Class<?> api, Class<?> implementor, final int port, Class<? extends TProtocolFactory> protoClass, Properties properties) throws Exception { - final TNonblockingServerSocket socket = new TNonblockingServerSocket(port); - - // create the implementor - Object impl = implementor.getConstructor(Properties.class).newInstance(properties); - - Class<?> proxyProcClass = Class.forName(api.getName() + "$Processor"); Class<?> proxyIfaceClass = Class.forName(api.getName() + "$Iface"); + return createProxyServer(proxyIfaceClass, api, implementor, port, protoClass, properties); + } + private static <I,P extends TBaseProcessor<I>> TServer createProxyServer(final Class<I> proxyIfaceClass, Class<?> api, final Class<?> implementor, + final int port, final Class<? extends TProtocolFactory> protoClass, final Properties properties) throws Exception { + @SuppressWarnings("unchecked") + Class<P> proxyProcClass = (Class<P>) Class.forName(api.getName() + "$Processor"); + + // create the implementor @SuppressWarnings("unchecked") - Constructor<? extends TProcessor> proxyProcConstructor = (Constructor<? extends TProcessor>) proxyProcClass.getConstructor(proxyIfaceClass); + I impl = (I) implementor.getConstructor(Properties.class).newInstance(properties); - @SuppressWarnings({"rawtypes", "unchecked"}) - final TProcessor processor = proxyProcConstructor.newInstance(RpcWrapper.service(impl, ((TBaseProcessor) proxyProcConstructor.newInstance(impl)).getProcessMapView())); + Constructor<P> proxyProcConstructor = proxyProcClass.getConstructor(proxyIfaceClass); + P processor = proxyProcConstructor.newInstance(RpcWrapper.service(impl, proxyProcConstructor.newInstance(impl))); + TNonblockingServerSocket socket = new TNonblockingServerSocket(port); THsHaServer.Args args = new THsHaServer.Args(socket); args.processor(processor); final long maxFrameSize = AccumuloConfiguration.getMemoryInBytes(properties.getProperty("maxFrameSize", "16M")); http://git-wip-us.apache.org/repos/asf/accumulo/blob/bd8cf5e2/server/base/src/main/java/org/apache/accumulo/server/util/RpcWrapper.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/RpcWrapper.java b/server/base/src/main/java/org/apache/accumulo/server/util/RpcWrapper.java index aafe37c..78a9d4f 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/RpcWrapper.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/RpcWrapper.java @@ -28,7 +28,7 @@ import org.apache.accumulo.trace.instrument.thrift.RpcServerInvocationHandler; import org.apache.accumulo.trace.instrument.thrift.TraceWrap; import org.apache.thrift.ProcessFunction; import org.apache.thrift.TApplicationException; -import org.apache.thrift.TBase; +import org.apache.thrift.TBaseProcessor; import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,14 +51,15 @@ import org.slf4j.LoggerFactory; public class RpcWrapper { private static final Logger log = LoggerFactory.getLogger(RpcWrapper.class); - public static <T> T service(final T instance, @SuppressWarnings("rawtypes") final Map<String,ProcessFunction<T,? extends TBase>> processorView) { + public static <I> I service(final I instance, final TBaseProcessor<I> processor) { + final Map<String,ProcessFunction<I,?>> processorView = processor.getProcessMapView(); final Set<String> onewayMethods = getOnewayMethods(processorView); log.debug("Found oneway Thrift methods: " + onewayMethods); InvocationHandler handler = getInvocationHandler(instance, onewayMethods); @SuppressWarnings("unchecked") - T proxiedInstance = (T) Proxy.newProxyInstance(instance.getClass().getClassLoader(), instance.getClass().getInterfaces(), handler); + I proxiedInstance = (I) Proxy.newProxyInstance(instance.getClass().getClassLoader(), instance.getClass().getInterfaces(), handler); return proxiedInstance; } @@ -90,7 +91,7 @@ public class RpcWrapper { }; } - protected static <T> Set<String> getOnewayMethods(@SuppressWarnings("rawtypes") Map<String,ProcessFunction<T,? extends TBase>> processorView) { + protected static Set<String> getOnewayMethods(Map<String,?> processorView) { // Get a handle on the isOnewayMethod and make it accessible final Method isOnewayMethod; try { @@ -106,7 +107,7 @@ public class RpcWrapper { try { final Set<String> onewayMethods = new HashSet<String>(); - for (@SuppressWarnings("rawtypes") Entry<String,ProcessFunction<T,? extends TBase>> entry : processorView.entrySet()) { + for (Entry<String,?> entry : processorView.entrySet()) { try { if ((Boolean) isOnewayMethod.invoke(entry.getValue())) { onewayMethods.add(entry.getKey()); http://git-wip-us.apache.org/repos/asf/accumulo/blob/bd8cf5e2/server/base/src/test/java/org/apache/accumulo/server/util/RpcWrapperTest.java ---------------------------------------------------------------------- diff --git a/server/base/src/test/java/org/apache/accumulo/server/util/RpcWrapperTest.java b/server/base/src/test/java/org/apache/accumulo/server/util/RpcWrapperTest.java index 0237315..06ddc1e 100644 --- a/server/base/src/test/java/org/apache/accumulo/server/util/RpcWrapperTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/util/RpcWrapperTest.java @@ -37,11 +37,14 @@ import com.google.common.collect.Sets; public class RpcWrapperTest { private static final String RTE_MESSAGE = "RpcWrapperTest's RuntimeException Message"; + /** * Given a method name and whether or not the method is oneway, construct a ProcessFunction. * - * @param methodName The service method name. - * @param isOneway Is the method oneway. + * @param methodName + * The service method name. + * @param isOneway + * Is the method oneway. * @return A ProcessFunction. */ private fake_proc<FakeService> createProcessFunction(String methodName, boolean isOneway) { @@ -50,8 +53,7 @@ public class RpcWrapperTest { @Test public void testSomeOnewayMethods() { - @SuppressWarnings("rawtypes") - Map<String,ProcessFunction<FakeService,? extends TBase>> procs = new HashMap<String,ProcessFunction<FakeService,? extends TBase>>(); + Map<String,ProcessFunction<FakeService,?>> procs = new HashMap<String,ProcessFunction<FakeService,?>>(); procs.put("foo", createProcessFunction("foo", true)); procs.put("foobar", createProcessFunction("foobar", false)); procs.put("bar", createProcessFunction("bar", true)); @@ -63,8 +65,7 @@ public class RpcWrapperTest { @Test public void testNoOnewayMethods() { - @SuppressWarnings("rawtypes") - Map<String,ProcessFunction<FakeService,? extends TBase>> procs = new HashMap<String,ProcessFunction<FakeService,? extends TBase>>(); + Map<String,ProcessFunction<FakeService,?>> procs = new HashMap<String,ProcessFunction<FakeService,?>>(); procs.put("foo", createProcessFunction("foo", false)); procs.put("foobar", createProcessFunction("foobar", false)); procs.put("bar", createProcessFunction("bar", false)); @@ -76,8 +77,7 @@ public class RpcWrapperTest { @Test public void testAllOnewayMethods() { - @SuppressWarnings("rawtypes") - Map<String,ProcessFunction<FakeService,? extends TBase>> procs = new HashMap<String,ProcessFunction<FakeService,? extends TBase>>(); + Map<String,ProcessFunction<FakeService,?>> procs = new HashMap<String,ProcessFunction<FakeService,?>>(); procs.put("foo", createProcessFunction("foo", true)); procs.put("foobar", createProcessFunction("foobar", true)); procs.put("bar", createProcessFunction("bar", true)); @@ -164,8 +164,11 @@ public class RpcWrapperTest { */ interface FakeService { void foo(); + String foobar(); + int bar(); + long barfoo(); } @@ -197,7 +200,7 @@ public class RpcWrapperTest { /** * A fake ProcessFunction implementation for testing that allows injection of method name and oneway. */ - private static class fake_proc<I extends FakeService> extends org.apache.thrift.ProcessFunction<I, foo_args> { + private static class fake_proc<I extends FakeService> extends org.apache.thrift.ProcessFunction<I,foo_args> { final private boolean isOneway; public fake_proc(String methodName, boolean isOneway) { @@ -235,7 +238,7 @@ public class RpcWrapperTest { /** * Fake arguments for our fake service. */ - private static class foo_args implements org.apache.thrift.TBase<foo_args, fake_fields> { + private static class foo_args implements org.apache.thrift.TBase<foo_args,fake_fields> { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/accumulo/blob/bd8cf5e2/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java ---------------------------------------------------------------------- diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java index b4afda8..1c6836b 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java @@ -702,7 +702,7 @@ public class SimpleGarbageCollector implements Iface { } private HostAndPort startStatsService() throws UnknownHostException { - Processor<Iface> processor = new Processor<Iface>(RpcWrapper.service(this, new Processor<Iface>(this).getProcessMapView())); + Processor<Iface> processor = new Processor<Iface>(RpcWrapper.service(this, new Processor<Iface>(this))); int port = config.getPort(Property.GC_PORT); long maxMessageSize = config.getMemoryInBytes(Property.GENERAL_MAX_MESSAGE_SIZE); HostAndPort result = HostAndPort.fromParts(opts.getAddress(), port); http://git-wip-us.apache.org/repos/asf/accumulo/blob/bd8cf5e2/server/master/src/main/java/org/apache/accumulo/master/Master.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/Master.java b/server/master/src/main/java/org/apache/accumulo/master/Master.java index af481c8..0434714 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/Master.java +++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java @@ -1032,8 +1032,8 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt throw new IOException(e); } - Processor<Iface> processor = new Processor<Iface>(RpcWrapper.service(new MasterClientServiceHandler(this), - new Processor<Iface>(new MasterClientServiceHandler(this)).getProcessMapView())); + Processor<Iface> processor = new Processor<Iface>( + RpcWrapper.service(new MasterClientServiceHandler(this), new Processor<Iface>(new MasterClientServiceHandler(this)))); ServerAddress sa = TServerUtils.startServer(getSystemConfiguration(), hostname, Property.MASTER_CLIENTPORT, processor, "Master", "Master Client Service Handler", null, Property.MASTER_MINTHREADS, Property.MASTER_THREADCHECK, Property.GENERAL_MAX_MESSAGE_SIZE); clientService = sa.server; http://git-wip-us.apache.org/repos/asf/accumulo/blob/bd8cf5e2/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 651df66..dce66b0 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -3160,7 +3160,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu private HostAndPort startTabletClientService() throws UnknownHostException { // start listening for client connection last ThriftClientHandler handler = new ThriftClientHandler(); - Iface tch = RpcWrapper.service(handler, new Processor<Iface>(handler).getProcessMapView()); + Iface tch = RpcWrapper.service(handler, new Processor<Iface>(handler)); Processor<Iface> processor = new Processor<Iface>(tch); HostAndPort address = startServer(getSystemConfiguration(), clientAddress.getHostText(), Property.TSERV_CLIENTPORT, processor, "Thrift Client Server"); log.info("address = " + address);