Repository: accumulo Updated Branches: refs/heads/1.6.1-SNAPSHOT 0a0bdd999 -> 8bc8d4e4f refs/heads/master 35f618eee -> 4f139138a
ACCUMULO-2950 Add an RpcWrapper class Adds an RpcWrapper class to reintroduce the previous behavior in Thrift 0.9.0 that was broken by THRIFT-1805 in Thrift 0.9.1. This is done by extending the InvocationHandler that was already being used to incorporate Trace information into the RPC calls, but now also translates exceptions to one that Thrift 0.9.1 will expose to the client as a TApplicationException.INTERNAL_ERROR Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/8bc8d4e4 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/8bc8d4e4 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/8bc8d4e4 Branch: refs/heads/1.6.1-SNAPSHOT Commit: 8bc8d4e4f93a20571f4764e1fdc9779d4f42fce9 Parents: 0a0bdd9 Author: Christopher Tubbs <ctubb...@apache.org> Authored: Wed Jun 25 13:12:04 2014 -0400 Committer: Christopher Tubbs <ctubb...@apache.org> Committed: Wed Jun 25 16:26:31 2014 -0400 ---------------------------------------------------------------------- .../java/org/apache/accumulo/proxy/Proxy.java | 4 +- server/base/pom.xml | 19 +++--- .../apache/accumulo/server/util/RpcWrapper.java | 62 ++++++++++++++++++ .../accumulo/gc/SimpleGarbageCollector.java | 4 +- .../java/org/apache/accumulo/master/Master.java | 4 +- .../apache/accumulo/tserver/TabletServer.java | 4 +- .../thrift/RpcClientInvocationHandler.java | 54 ++++++++++++++++ .../thrift/RpcServerInvocationHandler.java | 53 +++++++++++++++ .../trace/instrument/thrift/TraceWrap.java | 68 ++++---------------- 9 files changed, 200 insertions(+), 72 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/8bc8d4e4/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java ---------------------------------------------------------------------- diff --git a/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java b/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java index 3e404b1..8130631 100644 --- a/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java +++ b/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java @@ -27,6 +27,7 @@ import org.apache.accumulo.core.cli.Help; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.minicluster.MiniAccumuloCluster; import org.apache.accumulo.proxy.thrift.AccumuloProxy; +import org.apache.accumulo.server.util.RpcWrapper; import org.apache.log4j.Logger; import org.apache.thrift.TProcessor; import org.apache.thrift.protocol.TCompactProtocol; @@ -100,6 +101,7 @@ public class Proxy { opts.prop.setProperty("instance", accumulo.getConfig().getInstanceName()); opts.prop.setProperty("zookeepers", accumulo.getZooKeepers()); Runtime.getRuntime().addShutdownHook(new Thread() { + @Override public void start() { try { accumulo.stop(); @@ -133,7 +135,7 @@ public class Proxy { @SuppressWarnings("unchecked") Constructor<? extends TProcessor> proxyProcConstructor = (Constructor<? extends TProcessor>) proxyProcClass.getConstructor(proxyIfaceClass); - final TProcessor processor = proxyProcConstructor.newInstance(impl); + final TProcessor processor = proxyProcConstructor.newInstance(RpcWrapper.service(impl)); THsHaServer.Args args = new THsHaServer.Args(socket); args.processor(processor); http://git-wip-us.apache.org/repos/asf/accumulo/blob/8bc8d4e4/server/base/pom.xml ---------------------------------------------------------------------- diff --git a/server/base/pom.xml b/server/base/pom.xml index ee14a0a..6eb91a0 100644 --- a/server/base/pom.xml +++ b/server/base/pom.xml @@ -92,23 +92,22 @@ <artifactId>zookeeper</artifactId> </dependency> <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <scope>test</scope> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> </dependency> <dependency> - <groupId>org.easymock</groupId> - <artifactId>easymock</artifactId> - <scope>test</scope> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <scope>runtime</scope> </dependency> <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-api</artifactId> + <groupId>junit</groupId> + <artifactId>junit</artifactId> <scope>test</scope> </dependency> <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> + <groupId>org.easymock</groupId> + <artifactId>easymock</artifactId> <scope>test</scope> </dependency> </dependencies> http://git-wip-us.apache.org/repos/asf/accumulo/blob/8bc8d4e4/server/base/src/main/java/org/apache/accumulo/server/util/RpcWrapper.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/RpcWrapper.java b/server/base/src/main/java/org/apache/accumulo/server/util/RpcWrapper.java new file mode 100644 index 0000000..02bd1ef --- /dev/null +++ b/server/base/src/main/java/org/apache/accumulo/server/util/RpcWrapper.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.util; + +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; + +import org.apache.accumulo.trace.instrument.thrift.RpcServerInvocationHandler; +import org.apache.accumulo.trace.instrument.thrift.TraceWrap; +import org.apache.thrift.TApplicationException; +import org.apache.thrift.TException; +import org.slf4j.LoggerFactory; + +/** + * This class accommodates the changes in THRIFT-1805, which appeared in Thrift 0.9.1 and restricts client-side notification of server-side errors to + * {@link TException} only, by wrapping {@link RuntimeException} and {@link Error} as {@link TException}, so it doesn't just close the connection and look like + * a network issue, but informs the client that a {@link TApplicationException} had occurred, as it did in Thrift 0.9.0. This performs similar functions as + * {@link TraceWrap}, but with the additional action of translating exceptions. See also ACCUMULO-1691 and ACCUMULO-2950. + * + * @since 1.6.1 + */ +public class RpcWrapper { + + public static <T> T service(final T instance) { + InvocationHandler handler = new RpcServerInvocationHandler<T>(instance) { + @Override + public Object invoke(Object obj, Method method, Object[] args) throws Throwable { + try { + return super.invoke(obj, method, args); + } catch (RuntimeException e) { + String msg = e.getMessage(); + LoggerFactory.getLogger(instance.getClass()).error(msg, e); + throw new TException(msg); + } catch (Error e) { + String msg = e.getMessage(); + LoggerFactory.getLogger(instance.getClass()).error(msg, e); + throw new TException(msg); + } + } + }; + + @SuppressWarnings("unchecked") + T proxiedInstance = (T) Proxy.newProxyInstance(instance.getClass().getClassLoader(), instance.getClass().getInterfaces(), handler); + return proxiedInstance; + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/8bc8d4e4/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java ---------------------------------------------------------------------- diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java index 39716eb..62641a0 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java @@ -85,6 +85,7 @@ import org.apache.accumulo.server.fs.VolumeUtil; import org.apache.accumulo.server.security.SystemCredentials; import org.apache.accumulo.server.tables.TableManager; import org.apache.accumulo.server.util.Halt; +import org.apache.accumulo.server.util.RpcWrapper; import org.apache.accumulo.server.util.TServerUtils; import org.apache.accumulo.server.util.TabletIterator; import org.apache.accumulo.server.zookeeper.ZooLock; @@ -92,7 +93,6 @@ import org.apache.accumulo.trace.instrument.CountSampler; import org.apache.accumulo.trace.instrument.Sampler; import org.apache.accumulo.trace.instrument.Span; import org.apache.accumulo.trace.instrument.Trace; -import org.apache.accumulo.trace.instrument.thrift.TraceWrap; import org.apache.accumulo.trace.thrift.TInfo; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; @@ -645,7 +645,7 @@ public class SimpleGarbageCollector implements Iface { } private HostAndPort startStatsService() throws UnknownHostException { - Processor<Iface> processor = new Processor<Iface>(TraceWrap.service(this)); + Processor<Iface> processor = new Processor<Iface>(RpcWrapper.service(this)); AccumuloConfiguration conf = ServerConfiguration.getSystemConfiguration(instance); int port = conf.getPort(Property.GC_PORT); long maxMessageSize = conf.getMemoryInBytes(Property.GENERAL_MAX_MESSAGE_SIZE); http://git-wip-us.apache.org/repos/asf/accumulo/blob/8bc8d4e4/server/master/src/main/java/org/apache/accumulo/master/Master.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/Master.java b/server/master/src/main/java/org/apache/accumulo/master/Master.java index 797e066..c367ae0 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/Master.java +++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java @@ -110,6 +110,7 @@ import org.apache.accumulo.server.tables.TableObserver; import org.apache.accumulo.server.util.DefaultMap; import org.apache.accumulo.server.util.Halt; import org.apache.accumulo.server.util.MetadataTableUtil; +import org.apache.accumulo.server.util.RpcWrapper; import org.apache.accumulo.server.util.TServerUtils; import org.apache.accumulo.server.util.TServerUtils.ServerAddress; import org.apache.accumulo.server.util.time.SimpleTimer; @@ -117,7 +118,6 @@ import org.apache.accumulo.server.zookeeper.ZooLock; import org.apache.accumulo.server.zookeeper.ZooReaderWriter; import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader; import org.apache.accumulo.start.classloader.vfs.ContextManager; -import org.apache.accumulo.trace.instrument.thrift.TraceWrap; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.DataOutputBuffer; @@ -973,7 +973,7 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt throw new IOException(e); } - Processor<Iface> processor = new Processor<Iface>(TraceWrap.service(new MasterClientServiceHandler(this))); + Processor<Iface> processor = new Processor<Iface>(RpcWrapper.service(new MasterClientServiceHandler(this))); ServerAddress sa = TServerUtils.startServer(getSystemConfiguration(), hostname, Property.MASTER_CLIENTPORT, processor, "Master", "Master Client Service Handler", null, Property.MASTER_MINTHREADS, Property.MASTER_THREADCHECK, Property.GENERAL_MAX_MESSAGE_SIZE); clientService = sa.server; http://git-wip-us.apache.org/repos/asf/accumulo/blob/8bc8d4e4/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index e2510d7..57415bd 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -179,6 +179,7 @@ import org.apache.accumulo.server.util.FileSystemMonitor; import org.apache.accumulo.server.util.Halt; import org.apache.accumulo.server.util.MasterMetadataUtil; import org.apache.accumulo.server.util.MetadataTableUtil; +import org.apache.accumulo.server.util.RpcWrapper; import org.apache.accumulo.server.util.TServerUtils; import org.apache.accumulo.server.util.TServerUtils.ServerAddress; import org.apache.accumulo.server.util.time.RelativeTime; @@ -192,7 +193,6 @@ import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader; import org.apache.accumulo.start.classloader.vfs.ContextManager; import org.apache.accumulo.trace.instrument.Span; import org.apache.accumulo.trace.instrument.Trace; -import org.apache.accumulo.trace.instrument.thrift.TraceWrap; import org.apache.accumulo.trace.thrift.TInfo; import org.apache.accumulo.tserver.Compactor.CompactionInfo; import org.apache.accumulo.tserver.RowLocks.RowLock; @@ -3085,7 +3085,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu private HostAndPort startTabletClientService() throws UnknownHostException { // start listening for client connection last - Iface tch = TraceWrap.service(new ThriftClientHandler()); + Iface tch = RpcWrapper.service(new ThriftClientHandler()); Processor<Iface> processor = new Processor<Iface>(tch); HostAndPort address = startServer(getSystemConfiguration(), clientAddress.getHostText(), Property.TSERV_CLIENTPORT, processor, "Thrift Client Server"); log.info("address = " + address); http://git-wip-us.apache.org/repos/asf/accumulo/blob/8bc8d4e4/trace/src/main/java/org/apache/accumulo/trace/instrument/thrift/RpcClientInvocationHandler.java ---------------------------------------------------------------------- diff --git a/trace/src/main/java/org/apache/accumulo/trace/instrument/thrift/RpcClientInvocationHandler.java b/trace/src/main/java/org/apache/accumulo/trace/instrument/thrift/RpcClientInvocationHandler.java new file mode 100644 index 0000000..ea57fe7 --- /dev/null +++ b/trace/src/main/java/org/apache/accumulo/trace/instrument/thrift/RpcClientInvocationHandler.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.trace.instrument.thrift; + +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; + +import org.apache.accumulo.trace.instrument.Span; +import org.apache.accumulo.trace.instrument.Trace; +import org.apache.accumulo.trace.instrument.Tracer; +import org.apache.accumulo.trace.thrift.TInfo; + +public class RpcClientInvocationHandler<I> implements InvocationHandler { + + private final I instance; + + protected RpcClientInvocationHandler(final I clientInstance) { + instance = clientInstance; + } + + @Override + public Object invoke(Object obj, Method method, Object[] args) throws Throwable { + if (args == null || args.length < 1 || args[0] != null) { + return method.invoke(instance, args); + } + Class<?> klass = method.getParameterTypes()[0]; + if (TInfo.class.isAssignableFrom(klass)) { + args[0] = Tracer.traceInfo(); + } + Span span = Trace.start("client:" + method.getName()); + try { + return method.invoke(instance, args); + } catch (InvocationTargetException ex) { + throw ex.getCause(); + } finally { + span.stop(); + } + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/8bc8d4e4/trace/src/main/java/org/apache/accumulo/trace/instrument/thrift/RpcServerInvocationHandler.java ---------------------------------------------------------------------- diff --git a/trace/src/main/java/org/apache/accumulo/trace/instrument/thrift/RpcServerInvocationHandler.java b/trace/src/main/java/org/apache/accumulo/trace/instrument/thrift/RpcServerInvocationHandler.java new file mode 100644 index 0000000..4188bf4 --- /dev/null +++ b/trace/src/main/java/org/apache/accumulo/trace/instrument/thrift/RpcServerInvocationHandler.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.trace.instrument.thrift; + +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; + +import org.apache.accumulo.trace.instrument.Span; +import org.apache.accumulo.trace.instrument.Trace; +import org.apache.accumulo.trace.thrift.TInfo; + +public class RpcServerInvocationHandler<I> implements InvocationHandler { + + private final I instance; + + protected RpcServerInvocationHandler(final I serverInstance) { + instance = serverInstance; + } + + @Override + public Object invoke(Object obj, Method method, Object[] args) throws Throwable { + if (args == null || args.length < 1 || args[0] == null || !(args[0] instanceof TInfo)) { + try { + return method.invoke(instance, args); + } catch (InvocationTargetException ex) { + throw ex.getCause(); + } + } + Span span = Trace.trace((TInfo) args[0], method.getName()); + try { + return method.invoke(instance, args); + } catch (InvocationTargetException ex) { + throw ex.getCause(); + } finally { + span.stop(); + } + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/8bc8d4e4/trace/src/main/java/org/apache/accumulo/trace/instrument/thrift/TraceWrap.java ---------------------------------------------------------------------- diff --git a/trace/src/main/java/org/apache/accumulo/trace/instrument/thrift/TraceWrap.java b/trace/src/main/java/org/apache/accumulo/trace/instrument/thrift/TraceWrap.java index 49fa4b2..ed91042 100644 --- a/trace/src/main/java/org/apache/accumulo/trace/instrument/thrift/TraceWrap.java +++ b/trace/src/main/java/org/apache/accumulo/trace/instrument/thrift/TraceWrap.java @@ -17,16 +17,8 @@ package org.apache.accumulo.trace.instrument.thrift; import java.lang.reflect.InvocationHandler; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; import java.lang.reflect.Proxy; -import org.apache.accumulo.trace.instrument.Span; -import org.apache.accumulo.trace.instrument.Trace; -import org.apache.accumulo.trace.instrument.Tracer; -import org.apache.accumulo.trace.thrift.TInfo; - - /** * To move trace data from client to server, the RPC call must be annotated to take a TInfo object as its first argument. The user can simply pass null, so long * as they wrap their Client and Service objects with these functions. @@ -45,55 +37,21 @@ import org.apache.accumulo.trace.thrift.TInfo; * */ public class TraceWrap { - - @SuppressWarnings("unchecked") + public static <T> T service(final T instance) { - InvocationHandler handler = new InvocationHandler() { - @Override - public Object invoke(Object obj, Method method, Object[] args) throws Throwable { - if (args == null || args.length < 1 || args[0] == null || !(args[0] instanceof TInfo)) { - try { - return method.invoke(instance, args); - } catch (InvocationTargetException ex) { - throw ex.getCause(); - } - } - Span span = Trace.trace((TInfo) args[0], method.getName()); - try { - return method.invoke(instance, args); - } catch (InvocationTargetException ex) { - throw ex.getCause(); - } finally { - span.stop(); - } - } - }; - return (T) Proxy.newProxyInstance(instance.getClass().getClassLoader(), instance.getClass().getInterfaces(), handler); + InvocationHandler handler = new RpcServerInvocationHandler<T>(instance); + return wrappedInstance(handler, instance); } - - @SuppressWarnings("unchecked") + public static <T> T client(final T instance) { - InvocationHandler handler = new InvocationHandler() { - @Override - public Object invoke(Object obj, Method method, Object[] args) throws Throwable { - if (args == null || args.length < 1 || args[0] != null) { - return method.invoke(instance, args); - } - Class<?> klass = method.getParameterTypes()[0]; - if (TInfo.class.isAssignableFrom(klass)) { - args[0] = Tracer.traceInfo(); - } - Span span = Trace.start("client:" + method.getName()); - try { - return method.invoke(instance, args); - } catch (InvocationTargetException ex) { - throw ex.getCause(); - } finally { - span.stop(); - } - } - }; - return (T) Proxy.newProxyInstance(instance.getClass().getClassLoader(), instance.getClass().getInterfaces(), handler); + InvocationHandler handler = new RpcClientInvocationHandler<T>(instance); + return wrappedInstance(handler, instance); } - + + private static <T> T wrappedInstance(final InvocationHandler handler, final T instance) { + @SuppressWarnings("unchecked") + T proxiedInstance = (T) Proxy.newProxyInstance(instance.getClass().getClassLoader(), instance.getClass().getInterfaces(), handler); + return proxiedInstance; + } + }