Merge branch '1.6' into 1.7
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/0e3af1e8 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/0e3af1e8 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/0e3af1e8 Branch: refs/heads/master Commit: 0e3af1e8560b0a28a4415f8da38a561965d552dc Parents: e35e189 bd8cf5e Author: Christopher Tubbs <ctubb...@apache.org> Authored: Thu Dec 3 15:36:49 2015 -0500 Committer: Christopher Tubbs <ctubb...@apache.org> Committed: Thu Dec 3 15:36:49 2015 -0500 ---------------------------------------------------------------------- .../src/main/java/org/apache/accumulo/proxy/Proxy.java | 2 +- .../java/org/apache/accumulo/server/rpc/RpcWrapper.java | 12 ++++++------ .../org/apache/accumulo/server/rpc/RpcWrapperTest.java | 9 +++------ .../org/apache/accumulo/gc/SimpleGarbageCollector.java | 2 +- .../main/java/org/apache/accumulo/master/Master.java | 4 ++-- .../java/org/apache/accumulo/tserver/TabletServer.java | 4 ++-- 6 files changed, 15 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/0e3af1e8/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java ---------------------------------------------------------------------- diff --cc proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java index ca39786,53221e0..87e2c58 --- a/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java +++ b/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java @@@ -161,113 -117,38 +161,113 @@@ public class Proxy implements KeywordEx Class<? extends TProtocolFactory> protoFactoryClass = Class.forName(opts.prop.getProperty("protocolFactory", TCompactProtocol.Factory.class.getName())) .asSubclass(TProtocolFactory.class); + TProtocolFactory protoFactory = protoFactoryClass.newInstance(); int port = Integer.parseInt(opts.prop.getProperty("port")); - TServer server = createProxyServer(AccumuloProxy.class, ProxyServer.class, port, protoFactoryClass, opts.prop); - server.serve(); + String hostname = opts.prop.getProperty(THRIFT_SERVER_HOSTNAME, THRIFT_SERVER_HOSTNAME_DEFAULT); + HostAndPort address = HostAndPort.fromParts(hostname, port); + ServerAddress server = createProxyServer(address, protoFactory, opts.prop); + // Wait for the server to come up + while (!server.server.isServing()) { + Thread.sleep(100); + } + log.info("Proxy server started on " + server.getAddress()); + while (server.server.isServing()) { + Thread.sleep(1000); + } + } + + public static void main(String[] args) throws Exception { + new Proxy().execute(args); } - public static TServer createProxyServer(Class<?> api, Class<?> implementor, final int port, Class<? extends TProtocolFactory> protoClass, - Properties properties) throws Exception { - Class<?> proxyIfaceClass = Class.forName(api.getName() + "$Iface"); - return createProxyServer(proxyIfaceClass, api, implementor, port, protoClass, properties); + public static ServerAddress createProxyServer(HostAndPort address, TProtocolFactory protocolFactory, Properties properties) throws Exception { + return createProxyServer(address, protocolFactory, properties, ClientConfiguration.loadDefault()); } - private static <I,P extends TBaseProcessor<I>> TServer createProxyServer(final Class<I> proxyIfaceClass, Class<?> api, final Class<?> implementor, - final int port, final Class<? extends TProtocolFactory> protoClass, final Properties properties) throws Exception { - @SuppressWarnings("unchecked") - Class<P> proxyProcClass = (Class<P>) Class.forName(api.getName() + "$Processor"); - - // create the implementor - @SuppressWarnings("unchecked") - I impl = (I) implementor.getConstructor(Properties.class).newInstance(properties); - - Constructor<P> proxyProcConstructor = proxyProcClass.getConstructor(proxyIfaceClass); - P processor = proxyProcConstructor.newInstance(RpcWrapper.service(impl, proxyProcConstructor.newInstance(impl))); - - TNonblockingServerSocket socket = new TNonblockingServerSocket(port); - THsHaServer.Args args = new THsHaServer.Args(socket); - args.processor(processor); - final long maxFrameSize = AccumuloConfiguration.getMemoryInBytes(properties.getProperty("maxFrameSize", "16M")); - if (maxFrameSize > Integer.MAX_VALUE) - throw new RuntimeException(maxFrameSize + " is larger than MAX_INT"); - args.transportFactory(new TFramedTransport.Factory((int) maxFrameSize)); - args.protocolFactory(protoClass.newInstance()); - args.maxReadBufferBytes = maxFrameSize; - return new THsHaServer(args); + public static ServerAddress createProxyServer(HostAndPort address, TProtocolFactory protocolFactory, Properties properties, ClientConfiguration clientConf) + throws Exception { + final int numThreads = Integer.parseInt(properties.getProperty(THRIFT_THREAD_POOL_SIZE_KEY, THRIFT_THREAD_POOL_SIZE_DEFAULT)); + final long maxFrameSize = AccumuloConfiguration.getMemoryInBytes(properties.getProperty(THRIFT_MAX_FRAME_SIZE_KEY, THRIFT_MAX_FRAME_SIZE_DEFAULT)); + final int simpleTimerThreadpoolSize = Integer.parseInt(Property.GENERAL_SIMPLETIMER_THREADPOOL_SIZE.getDefaultValue()); + // How frequently to try to resize the thread pool + final long threadpoolResizeInterval = 1000l * 5; + // No timeout + final long serverSocketTimeout = 0l; + // Use the new hadoop metrics2 support + final MetricsFactory metricsFactory = new MetricsFactory(false); + final String serverName = "Proxy", threadName = "Accumulo Thrift Proxy"; + + // create the implementation of the proxy interface + ProxyServer impl = new ProxyServer(properties); + + // Wrap the implementation -- translate some exceptions - AccumuloProxy.Iface wrappedImpl = RpcWrapper.service(impl, new AccumuloProxy.Processor<AccumuloProxy.Iface>(impl).getProcessMapView()); ++ AccumuloProxy.Iface wrappedImpl = RpcWrapper.service(impl, new AccumuloProxy.Processor<AccumuloProxy.Iface>(impl)); + + // Create the processor from the implementation + TProcessor processor = new AccumuloProxy.Processor<AccumuloProxy.Iface>(wrappedImpl); + + // Get the type of thrift server to instantiate + final String serverTypeStr = properties.getProperty(THRIFT_SERVER_TYPE, THRIFT_SERVER_TYPE_DEFAULT); + ThriftServerType serverType = DEFAULT_SERVER_TYPE; + if (!THRIFT_SERVER_TYPE_DEFAULT.equals(serverTypeStr)) { + serverType = ThriftServerType.get(serverTypeStr); + } + + SslConnectionParams sslParams = null; + SaslServerConnectionParams saslParams = null; + switch (serverType) { + case SSL: + sslParams = SslConnectionParams.forClient(ClientContext.convertClientConfig(clientConf)); + break; + case SASL: + if (!clientConf.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false)) { + // ACCUMULO-3651 Changed level to error and added FATAL to message for slf4j capability + log.error("FATAL: SASL thrift server was requested but it is disabled in client configuration"); + throw new RuntimeException("SASL is not enabled in configuration"); + } + + // Kerberos needs to be enabled to use it + if (!UserGroupInformation.isSecurityEnabled()) { + // ACCUMULO-3651 Changed level to error and added FATAL to message for slf4j capability + log.error("FATAL: Hadoop security is not enabled"); + throw new RuntimeException(); + } + + // Login via principal and keytab + final String kerberosPrincipal = properties.getProperty(KERBEROS_PRINCIPAL, ""), + kerberosKeytab = properties.getProperty(KERBEROS_KEYTAB, ""); + if (StringUtils.isBlank(kerberosPrincipal) || StringUtils.isBlank(kerberosKeytab)) { + // ACCUMULO-3651 Changed level to error and added FATAL to message for slf4j capability + log.error("FATAL: Kerberos principal and keytab must be provided"); + throw new RuntimeException(); + } + UserGroupInformation.loginUserFromKeytab(kerberosPrincipal, kerberosKeytab); + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + log.info("Logged in as " + ugi.getUserName()); + + // The kerberosPrimary set in the SASL server needs to match the principal we're logged in as. + final String shortName = ugi.getShortUserName(); + log.info("Setting server primary to {}", shortName); + clientConf.setProperty(ClientProperty.KERBEROS_SERVER_PRIMARY, shortName); + + KerberosToken token = new KerberosToken(); + saslParams = new SaslServerConnectionParams(clientConf, token, null); + + processor = new UGIAssumingProcessor(processor); + + break; + default: + // nothing to do -- no extra configuration necessary + break; + } + + // Hook up support for tracing for thrift calls + TimedProcessor timedProcessor = new TimedProcessor(metricsFactory, processor, serverName, threadName); + + // Create the thrift server with our processor and properties + ServerAddress serverAddr = TServerUtils.startTServer(address, serverType, timedProcessor, protocolFactory, serverName, threadName, numThreads, + simpleTimerThreadpoolSize, threadpoolResizeInterval, maxFrameSize, sslParams, saslParams, serverSocketTimeout); + + return serverAddr; } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/0e3af1e8/server/base/src/main/java/org/apache/accumulo/server/rpc/RpcWrapper.java ---------------------------------------------------------------------- diff --cc server/base/src/main/java/org/apache/accumulo/server/rpc/RpcWrapper.java index 585eb27,0000000..ec68166 mode 100644,000000..100644 --- a/server/base/src/main/java/org/apache/accumulo/server/rpc/RpcWrapper.java +++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/RpcWrapper.java @@@ -1,128 -1,0 +1,128 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.rpc; + +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +import org.apache.accumulo.core.trace.wrappers.RpcServerInvocationHandler; +import org.apache.accumulo.core.trace.wrappers.TraceWrap; +import org.apache.thrift.ProcessFunction; +import org.apache.thrift.TApplicationException; - import org.apache.thrift.TBase; ++import org.apache.thrift.TBaseProcessor; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class accommodates the changes in THRIFT-1805, which appeared in Thrift 0.9.1 and restricts client-side notification of server-side errors to + * {@link TException} only, by wrapping {@link RuntimeException} and {@link Error} as {@link TException}, so it doesn't just close the connection and look like + * a network issue, but informs the client that a {@link TApplicationException} had occurred, as it did in Thrift 0.9.0. This performs similar functions as + * {@link TraceWrap}, but with the additional action of translating exceptions. See also ACCUMULO-1691 and ACCUMULO-2950. + * + * ACCUMULO-4065 found that the above exception-wrapping is not appropriate for Thrift's implementation of oneway methods. Oneway methods are defined as a + * method which the client does not wait for it to return. Normally, this is acceptable as these methods are void. Therefore, if another client reuses the + * connection to send a new RPC, there is no "extra" data sitting on the InputStream from the Socket (that the server sent). However, the implementation of a + * oneway method <em>does</em> send a response to the client when the implementation throws a {@link TException}. This message showing up on the client's + * InputStream causes future use of the Thrift Connection to become unusable. As long as the Thrift implementation sends a message back when oneway methods + * throw a {@link TException}, we much make sure that we don't re-wrap-and-throw any exceptions as {@link TException}s. + * + * @since 1.6.1 + */ +public class RpcWrapper { + private static final Logger log = LoggerFactory.getLogger(RpcWrapper.class); + - public static <T> T service(final T instance, @SuppressWarnings("rawtypes") final Map<String,ProcessFunction<T,? extends TBase>> processorView) { ++ public static <I> I service(final I instance, final TBaseProcessor<I> processor) { ++ final Map<String,ProcessFunction<I,?>> processorView = processor.getProcessMapView(); + final Set<String> onewayMethods = getOnewayMethods(processorView); + log.debug("Found oneway Thrift methods: " + onewayMethods); + + InvocationHandler handler = getInvocationHandler(instance, onewayMethods); + + @SuppressWarnings("unchecked") - T proxiedInstance = (T) Proxy.newProxyInstance(instance.getClass().getClassLoader(), instance.getClass().getInterfaces(), handler); ++ I proxiedInstance = (I) Proxy.newProxyInstance(instance.getClass().getClassLoader(), instance.getClass().getInterfaces(), handler); + return proxiedInstance; + } + + protected static <T> RpcServerInvocationHandler<T> getInvocationHandler(final T instance, final Set<String> onewayMethods) { + return new RpcServerInvocationHandler<T>(instance) { + private final Logger log = LoggerFactory.getLogger(instance.getClass()); + + @Override + public Object invoke(Object obj, Method method, Object[] args) throws Throwable { + // e.g. ThriftClientHandler.flush(TInfo, TCredentials, ...) + try { + return super.invoke(obj, method, args); + } catch (RuntimeException e) { + String msg = e.getMessage(); + log.error(msg, e); + if (onewayMethods.contains(method.getName())) { + throw e; + } + throw new TException(msg); + } catch (Error e) { + String msg = e.getMessage(); + log.error(msg, e); + if (onewayMethods.contains(method.getName())) { + throw e; + } + throw new TException(msg); + } + } + }; + } + - protected static <T> Set<String> getOnewayMethods(@SuppressWarnings("rawtypes") Map<String,ProcessFunction<T,? extends TBase>> processorView) { ++ protected static Set<String> getOnewayMethods(Map<String,?> processorView) { + // Get a handle on the isOnewayMethod and make it accessible + final Method isOnewayMethod; + try { + isOnewayMethod = ProcessFunction.class.getDeclaredMethod("isOneway"); + } catch (NoSuchMethodException e) { + throw new RuntimeException("Could not access isOneway method", e); + } catch (SecurityException e) { + throw new RuntimeException("Could not access isOneway method", e); + } + // In java7, this appears to be copying the method, but it's trivial for us to return the object to how it was before. + final boolean accessible = isOnewayMethod.isAccessible(); + isOnewayMethod.setAccessible(true); + + try { + final Set<String> onewayMethods = new HashSet<String>(); - for (@SuppressWarnings("rawtypes") - Entry<String,ProcessFunction<T,? extends TBase>> entry : processorView.entrySet()) { ++ for (Entry<String,?> entry : processorView.entrySet()) { + try { + if ((Boolean) isOnewayMethod.invoke(entry.getValue())) { + onewayMethods.add(entry.getKey()); + } + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + return onewayMethods; + } finally { + // Reset it back to how it was. + isOnewayMethod.setAccessible(accessible); + } + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/0e3af1e8/server/base/src/test/java/org/apache/accumulo/server/rpc/RpcWrapperTest.java ---------------------------------------------------------------------- diff --cc server/base/src/test/java/org/apache/accumulo/server/rpc/RpcWrapperTest.java index e4e9402,0000000..39d3705 mode 100644,000000..100644 --- a/server/base/src/test/java/org/apache/accumulo/server/rpc/RpcWrapperTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/rpc/RpcWrapperTest.java @@@ -1,304 -1,0 +1,301 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.rpc; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import org.apache.accumulo.core.trace.wrappers.RpcServerInvocationHandler; +import org.apache.accumulo.server.rpc.RpcWrapper; +import org.apache.thrift.ProcessFunction; +import org.apache.thrift.TBase; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TProtocol; +import org.junit.Assert; +import org.junit.Test; + +import com.google.common.collect.Sets; + +/** + * Verification that RpcWrapper correctly mangles Exceptions to work around Thrift. + */ +public class RpcWrapperTest { + + private static final String RTE_MESSAGE = "RpcWrapperTest's RuntimeException Message"; + + /** + * Given a method name and whether or not the method is oneway, construct a ProcessFunction. + * + * @param methodName + * The service method name. + * @param isOneway + * Is the method oneway. + * @return A ProcessFunction. + */ + private fake_proc<FakeService> createProcessFunction(String methodName, boolean isOneway) { + return new fake_proc<FakeService>(methodName, isOneway); + } + + @Test + public void testSomeOnewayMethods() { - @SuppressWarnings("rawtypes") - Map<String,ProcessFunction<FakeService,? extends TBase>> procs = new HashMap<String,ProcessFunction<FakeService,? extends TBase>>(); ++ Map<String,ProcessFunction<FakeService,?>> procs = new HashMap<String,ProcessFunction<FakeService,?>>(); + procs.put("foo", createProcessFunction("foo", true)); + procs.put("foobar", createProcessFunction("foobar", false)); + procs.put("bar", createProcessFunction("bar", true)); + procs.put("barfoo", createProcessFunction("barfoo", false)); + + Set<String> onewayMethods = RpcWrapper.getOnewayMethods(procs); + Assert.assertEquals(Sets.newHashSet("foo", "bar"), onewayMethods); + } + + @Test + public void testNoOnewayMethods() { - @SuppressWarnings("rawtypes") - Map<String,ProcessFunction<FakeService,? extends TBase>> procs = new HashMap<String,ProcessFunction<FakeService,? extends TBase>>(); ++ Map<String,ProcessFunction<FakeService,?>> procs = new HashMap<String,ProcessFunction<FakeService,?>>(); + procs.put("foo", createProcessFunction("foo", false)); + procs.put("foobar", createProcessFunction("foobar", false)); + procs.put("bar", createProcessFunction("bar", false)); + procs.put("barfoo", createProcessFunction("barfoo", false)); + + Set<String> onewayMethods = RpcWrapper.getOnewayMethods(procs); + Assert.assertEquals(Collections.<String> emptySet(), onewayMethods); + } + + @Test + public void testAllOnewayMethods() { - @SuppressWarnings("rawtypes") - Map<String,ProcessFunction<FakeService,? extends TBase>> procs = new HashMap<String,ProcessFunction<FakeService,? extends TBase>>(); ++ Map<String,ProcessFunction<FakeService,?>> procs = new HashMap<String,ProcessFunction<FakeService,?>>(); + procs.put("foo", createProcessFunction("foo", true)); + procs.put("foobar", createProcessFunction("foobar", true)); + procs.put("bar", createProcessFunction("bar", true)); + procs.put("barfoo", createProcessFunction("barfoo", true)); + + Set<String> onewayMethods = RpcWrapper.getOnewayMethods(procs); + Assert.assertEquals(Sets.newHashSet("foo", "foobar", "bar", "barfoo"), onewayMethods); + } + + @Test + public void testNoExceptionWrappingForOneway() throws Throwable { + final Object[] args = new Object[0]; + + final FakeService impl = new FakeServiceImpl(); + + // "short" names throw RTEs and are oneway, while long names do not throw exceptions and are not oneway. + RpcServerInvocationHandler<FakeService> handler = RpcWrapper.getInvocationHandler(impl, Sets.newHashSet("foo", "bar")); + + // Should throw an exception, but not be wrapped because the method is oneway + try { + handler.invoke(impl, FakeServiceImpl.class.getMethod("foo"), args); + Assert.fail("Expected an exception"); + } catch (RuntimeException e) { + Assert.assertEquals(RTE_MESSAGE, e.getMessage()); + } + + // Should not throw an exception + handler.invoke(impl, FakeServiceImpl.class.getMethod("foobar"), args); + } + + @Test + public void testExceptionWrappingForNonOneway() throws Throwable { + final Object[] args = new Object[0]; + + final FakeService impl = new FakeServiceImpl(); + + // "short" names throw RTEs and are not oneway, while long names do not throw exceptions and are oneway. + RpcServerInvocationHandler<FakeService> handler = RpcWrapper.getInvocationHandler(impl, Sets.newHashSet("foobar", "barfoo")); + + // Should throw an exception, but not be wrapped because the method is oneway + try { + handler.invoke(impl, FakeServiceImpl.class.getMethod("foo"), args); + Assert.fail("Expected an exception"); + } catch (TException e) { + // The InvocationHandler should take the exception from the RTE and make it a TException + Assert.assertEquals(RTE_MESSAGE, e.getMessage()); + } + + // Should not throw an exception + handler.invoke(impl, FakeServiceImpl.class.getMethod("foobar"), args); + } + + // + // Some hacked together classes/interfaces that mimic what Thrift is doing. + // + + /** + * Some fake fields for our fake arguments. + */ + private static class fake_fields implements org.apache.thrift.TFieldIdEnum { + @Override + public short getThriftFieldId() { + throw new UnsupportedOperationException(); + } + + @Override + public String getFieldName() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean equals(Object o) { + throw new UnsupportedOperationException(); + } + + @Override + public int hashCode() { + throw new UnsupportedOperationException(); + } + } + + /** + * A fake thrift service + */ + interface FakeService { + void foo(); + + String foobar(); + + int bar(); + + long barfoo(); + } + + /** + * An implementation of the fake thrift service. The "short" names throw RTEs, while long names do not. + */ + public static class FakeServiceImpl implements FakeService { + @Override + public void foo() { + throw new RuntimeException(RTE_MESSAGE); + } + + @Override + public String foobar() { + return ""; + } + + @Override + public int bar() { + throw new RuntimeException(RTE_MESSAGE); + } + + @Override + public long barfoo() { + return 0; + } + }; + + /** + * A fake ProcessFunction implementation for testing that allows injection of method name and oneway. + */ + private static class fake_proc<I extends FakeService> extends org.apache.thrift.ProcessFunction<I,foo_args> { + final private boolean isOneway; + + public fake_proc(String methodName, boolean isOneway) { + super(methodName); + this.isOneway = isOneway; + } + + @Override + protected boolean isOneway() { + return isOneway; + } + + @SuppressWarnings("rawtypes") + @Override + public TBase getResult(I iface, foo_args args) throws TException { + throw new UnsupportedOperationException(); + } + + @Override + public foo_args getEmptyArgsInstance() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean equals(Object o) { + throw new UnsupportedOperationException(); + } + + @Override + public int hashCode() { + throw new UnsupportedOperationException(); + } + } + + /** + * Fake arguments for our fake service. + */ + private static class foo_args implements org.apache.thrift.TBase<foo_args,fake_fields> { + + private static final long serialVersionUID = 1L; + + @Override + public boolean equals(Object o) { + throw new UnsupportedOperationException(); + } + + @Override + public int hashCode() { + throw new UnsupportedOperationException(); + } + + @Override + public int compareTo(foo_args o) { + throw new UnsupportedOperationException(); + } + + @Override + public void read(TProtocol iprot) throws TException { + throw new UnsupportedOperationException(); + } + + @Override + public void write(TProtocol oprot) throws TException { + throw new UnsupportedOperationException(); + } + + @Override + public fake_fields fieldForId(int fieldId) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isSet(fake_fields field) { + throw new UnsupportedOperationException(); + } + + @Override + public Object getFieldValue(fake_fields field) { + throw new UnsupportedOperationException(); + } + + @Override + public void setFieldValue(fake_fields field, Object value) { + throw new UnsupportedOperationException(); + } + + @Override + public TBase<foo_args,fake_fields> deepCopy() { + throw new UnsupportedOperationException(); + } + + @Override + public void clear() { + throw new UnsupportedOperationException(); + } + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/0e3af1e8/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java ---------------------------------------------------------------------- diff --cc server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java index 4d55461,1c6836b..6bb8f9a --- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java @@@ -709,16 -702,9 +709,16 @@@ public class SimpleGarbageCollector ext } private HostAndPort startStatsService() throws UnknownHostException { - Iface rpcProxy = RpcWrapper.service(this, new Processor<Iface>(this).getProcessMapView()); - Processor<Iface> processor = new Processor<Iface>(RpcWrapper.service(this, new Processor<Iface>(this))); - int port = config.getPort(Property.GC_PORT); - long maxMessageSize = config.getMemoryInBytes(Property.GENERAL_MAX_MESSAGE_SIZE); ++ Iface rpcProxy = RpcWrapper.service(this, new Processor<Iface>(this)); + final Processor<Iface> processor; + if (ThriftServerType.SASL == getThriftServerType()) { + Iface tcProxy = TCredentialsUpdatingWrapper.service(rpcProxy, getClass(), getConfiguration()); + processor = new Processor<Iface>(tcProxy); + } else { + processor = new Processor<Iface>(rpcProxy); + } + int port = getConfiguration().getPort(Property.GC_PORT); + long maxMessageSize = getConfiguration().getMemoryInBytes(Property.GENERAL_MAX_MESSAGE_SIZE); HostAndPort result = HostAndPort.fromParts(opts.getAddress(), port); log.debug("Starting garbage collector listening on " + result); try { http://git-wip-us.apache.org/repos/asf/accumulo/blob/0e3af1e8/server/master/src/main/java/org/apache/accumulo/master/Master.java ---------------------------------------------------------------------- diff --cc server/master/src/main/java/org/apache/accumulo/master/Master.java index 02e1132,0434714..1370989 --- a/server/master/src/main/java/org/apache/accumulo/master/Master.java +++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java @@@ -1147,38 -1032,10 +1147,38 @@@ public class Master extends AccumuloSer throw new IOException(e); } - Processor<Iface> processor = new Processor<Iface>( - RpcWrapper.service(new MasterClientServiceHandler(this), new Processor<Iface>(new MasterClientServiceHandler(this)))); - ServerAddress sa = TServerUtils.startServer(getSystemConfiguration(), hostname, Property.MASTER_CLIENTPORT, processor, "Master", - "Master Client Service Handler", null, Property.MASTER_MINTHREADS, Property.MASTER_THREADCHECK, Property.GENERAL_MAX_MESSAGE_SIZE); + ZooKeeperInitialization.ensureZooKeeperInitialized(zReaderWriter, zroot); + + // Make sure that we have a secret key (either a new one or an old one from ZK) before we start + // the master client service. + if (null != authenticationTokenKeyManager && null != keyDistributor) { + log.info("Starting delegation-token key manager"); + keyDistributor.initialize(); + authenticationTokenKeyManager.start(); + boolean logged = false; + while (!authenticationTokenKeyManager.isInitialized()) { + // Print out a status message when we start waiting for the key manager to get initialized + if (!logged) { + log.info("Waiting for AuthenticationTokenKeyManager to be initialized"); + logged = true; + } + UtilWaitThread.sleep(200); + } + // And log when we are initialized + log.info("AuthenticationTokenSecretManager is initialized"); + } + + clientHandler = new MasterClientServiceHandler(this); - Iface rpcProxy = RpcWrapper.service(clientHandler, new Processor<Iface>(clientHandler).getProcessMapView()); ++ Iface rpcProxy = RpcWrapper.service(clientHandler, new Processor<Iface>(clientHandler)); + final Processor<Iface> processor; + if (ThriftServerType.SASL == getThriftServerType()) { + Iface tcredsProxy = TCredentialsUpdatingWrapper.service(rpcProxy, clientHandler.getClass(), getConfiguration()); + processor = new Processor<Iface>(tcredsProxy); + } else { + processor = new Processor<Iface>(rpcProxy); + } + ServerAddress sa = TServerUtils.startServer(this, hostname, Property.MASTER_CLIENTPORT, processor, "Master", "Master Client Service Handler", null, + Property.MASTER_MINTHREADS, Property.MASTER_THREADCHECK, Property.GENERAL_MAX_MESSAGE_SIZE); clientService = sa.server; String address = sa.address.toString(); log.info("Setting master lock data to " + address); @@@ -1187,43 -1044,6 +1187,43 @@@ while (!clientService.isServing()) { UtilWaitThread.sleep(100); } + + // Start the daemon to scan the replication table and make units of work + replicationWorkDriver = new ReplicationDriver(this); + replicationWorkDriver.start(); + + // Start the daemon to assign work to tservers to replicate to our peers + try { + replicationWorkAssigner = new WorkDriver(this); + } catch (AccumuloException | AccumuloSecurityException e) { + log.error("Caught exception trying to initialize replication WorkDriver", e); + throw new RuntimeException(e); + } + replicationWorkAssigner.start(); + + // Start the replication coordinator which assigns tservers to service replication requests + MasterReplicationCoordinator impl = new MasterReplicationCoordinator(this); + ReplicationCoordinator.Processor<ReplicationCoordinator.Iface> replicationCoordinatorProcessor = new ReplicationCoordinator.Processor<ReplicationCoordinator.Iface>( - RpcWrapper.service(impl, new ReplicationCoordinator.Processor<ReplicationCoordinator.Iface>(impl).getProcessMapView())); ++ RpcWrapper.service(impl, new ReplicationCoordinator.Processor<ReplicationCoordinator.Iface>(impl))); + ServerAddress replAddress = TServerUtils.startServer(this, hostname, Property.MASTER_REPLICATION_COORDINATOR_PORT, replicationCoordinatorProcessor, + "Master Replication Coordinator", "Replication Coordinator", null, Property.MASTER_REPLICATION_COORDINATOR_MINTHREADS, + Property.MASTER_REPLICATION_COORDINATOR_THREADCHECK, Property.GENERAL_MAX_MESSAGE_SIZE); + + log.info("Started replication coordinator service at " + replAddress.address); + + // Advertise that port we used so peers don't have to be told what it is + ZooReaderWriter.getInstance().putPersistentData(ZooUtil.getRoot(getInstance()) + Constants.ZMASTER_REPLICATION_COORDINATOR_ADDR, + replAddress.address.toString().getBytes(UTF_8), NodeExistsPolicy.OVERWRITE); + + // Register replication metrics + MasterMetricsFactory factory = new MasterMetricsFactory(getConfiguration(), this); + Metrics replicationMetrics = factory.createReplicationMetrics(); + try { + replicationMetrics.register(); + } catch (Exception e) { + log.error("Failed to register replication metrics", e); + } + while (clientService.isServing()) { UtilWaitThread.sleep(500); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/0e3af1e8/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java ---------------------------------------------------------------------- diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 3022a76,dce66b0..1080d8d --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@@ -2332,46 -3160,14 +2332,46 @@@ public class TabletServer extends Accum private HostAndPort startTabletClientService() throws UnknownHostException { // start listening for client connection last ThriftClientHandler handler = new ThriftClientHandler(); - Iface rpcProxy = RpcWrapper.service(handler, new Processor<Iface>(handler).getProcessMapView()); - Iface tch = RpcWrapper.service(handler, new Processor<Iface>(handler)); - Processor<Iface> processor = new Processor<Iface>(tch); - HostAndPort address = startServer(getSystemConfiguration(), clientAddress.getHostText(), Property.TSERV_CLIENTPORT, processor, "Thrift Client Server"); ++ Iface rpcProxy = RpcWrapper.service(handler, new Processor<Iface>(handler)); + final Processor<Iface> processor; + if (ThriftServerType.SASL == getThriftServerType()) { + Iface tcredProxy = TCredentialsUpdatingWrapper.service(rpcProxy, ThriftClientHandler.class, getConfiguration()); + processor = new Processor<Iface>(tcredProxy); + } else { + processor = new Processor<Iface>(rpcProxy); + } + HostAndPort address = startServer(getServerConfigurationFactory().getConfiguration(), clientAddress.getHostText(), Property.TSERV_CLIENTPORT, processor, + "Thrift Client Server"); log.info("address = " + address); return address; } - ZooLock getLock() { + private HostAndPort startReplicationService() throws UnknownHostException { + final ReplicationServicerHandler handler = new ReplicationServicerHandler(this); - ReplicationServicer.Iface rpcProxy = RpcWrapper.service(handler, new ReplicationServicer.Processor<ReplicationServicer.Iface>(handler).getProcessMapView()); ++ ReplicationServicer.Iface rpcProxy = RpcWrapper.service(handler, new ReplicationServicer.Processor<ReplicationServicer.Iface>(handler)); + ReplicationServicer.Iface repl = TCredentialsUpdatingWrapper.service(rpcProxy, handler.getClass(), getConfiguration()); + ReplicationServicer.Processor<ReplicationServicer.Iface> processor = new ReplicationServicer.Processor<ReplicationServicer.Iface>(repl); + AccumuloConfiguration conf = getServerConfigurationFactory().getConfiguration(); + Property maxMessageSizeProperty = (conf.get(Property.TSERV_MAX_MESSAGE_SIZE) != null ? Property.TSERV_MAX_MESSAGE_SIZE : Property.GENERAL_MAX_MESSAGE_SIZE); + ServerAddress sp = TServerUtils.startServer(this, clientAddress.getHostText(), Property.REPLICATION_RECEIPT_SERVICE_PORT, processor, + "ReplicationServicerHandler", "Replication Servicer", null, Property.REPLICATION_MIN_THREADS, Property.REPLICATION_THREADCHECK, maxMessageSizeProperty); + this.replServer = sp.server; + log.info("Started replication service on " + sp.address); + + try { + // The replication service is unique to the thrift service for a tserver, not just a host. + // Advertise the host and port for replication service given the host and port for the tserver. + ZooReaderWriter.getInstance().putPersistentData(ZooUtil.getRoot(getInstance()) + ReplicationConstants.ZOO_TSERVERS + "/" + clientAddress.toString(), + sp.address.toString().getBytes(UTF_8), NodeExistsPolicy.OVERWRITE); + } catch (Exception e) { + log.error("Could not advertise replication service port", e); + throw new RuntimeException(e); + } + + return sp.address; + } + + public ZooLock getLock() { return tabletServerLock; }