Merge branch '1.6' into 1.7

Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/0e3af1e8
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/0e3af1e8
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/0e3af1e8

Branch: refs/heads/master
Commit: 0e3af1e8560b0a28a4415f8da38a561965d552dc
Parents: e35e189 bd8cf5e
Author: Christopher Tubbs <ctubb...@apache.org>
Authored: Thu Dec 3 15:36:49 2015 -0500
Committer: Christopher Tubbs <ctubb...@apache.org>
Committed: Thu Dec 3 15:36:49 2015 -0500

----------------------------------------------------------------------
 .../src/main/java/org/apache/accumulo/proxy/Proxy.java  |  2 +-
 .../java/org/apache/accumulo/server/rpc/RpcWrapper.java | 12 ++++++------
 .../org/apache/accumulo/server/rpc/RpcWrapperTest.java  |  9 +++------
 .../org/apache/accumulo/gc/SimpleGarbageCollector.java  |  2 +-
 .../main/java/org/apache/accumulo/master/Master.java    |  4 ++--
 .../java/org/apache/accumulo/tserver/TabletServer.java  |  4 ++--
 6 files changed, 15 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/0e3af1e8/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java
----------------------------------------------------------------------
diff --cc proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java
index ca39786,53221e0..87e2c58
--- a/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java
+++ b/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java
@@@ -161,113 -117,38 +161,113 @@@ public class Proxy implements KeywordEx
  
      Class<? extends TProtocolFactory> protoFactoryClass = 
Class.forName(opts.prop.getProperty("protocolFactory", 
TCompactProtocol.Factory.class.getName()))
          .asSubclass(TProtocolFactory.class);
 +    TProtocolFactory protoFactory = protoFactoryClass.newInstance();
      int port = Integer.parseInt(opts.prop.getProperty("port"));
 -    TServer server = createProxyServer(AccumuloProxy.class, 
ProxyServer.class, port, protoFactoryClass, opts.prop);
 -    server.serve();
 +    String hostname = opts.prop.getProperty(THRIFT_SERVER_HOSTNAME, 
THRIFT_SERVER_HOSTNAME_DEFAULT);
 +    HostAndPort address = HostAndPort.fromParts(hostname, port);
 +    ServerAddress server = createProxyServer(address, protoFactory, 
opts.prop);
 +    // Wait for the server to come up
 +    while (!server.server.isServing()) {
 +      Thread.sleep(100);
 +    }
 +    log.info("Proxy server started on " + server.getAddress());
 +    while (server.server.isServing()) {
 +      Thread.sleep(1000);
 +    }
 +  }
 +
 +  public static void main(String[] args) throws Exception {
 +    new Proxy().execute(args);
    }
  
 -  public static TServer createProxyServer(Class<?> api, Class<?> implementor, 
final int port, Class<? extends TProtocolFactory> protoClass,
 -      Properties properties) throws Exception {
 -    Class<?> proxyIfaceClass = Class.forName(api.getName() + "$Iface");
 -    return createProxyServer(proxyIfaceClass, api, implementor, port, 
protoClass, properties);
 +  public static ServerAddress createProxyServer(HostAndPort address, 
TProtocolFactory protocolFactory, Properties properties) throws Exception {
 +    return createProxyServer(address, protocolFactory, properties, 
ClientConfiguration.loadDefault());
    }
  
 -  private static <I,P extends TBaseProcessor<I>> TServer 
createProxyServer(final Class<I> proxyIfaceClass, Class<?> api, final Class<?> 
implementor,
 -      final int port, final Class<? extends TProtocolFactory> protoClass, 
final Properties properties) throws Exception {
 -    @SuppressWarnings("unchecked")
 -    Class<P> proxyProcClass = (Class<P>) Class.forName(api.getName() + 
"$Processor");
 -
 -    // create the implementor
 -    @SuppressWarnings("unchecked")
 -    I impl = (I) 
implementor.getConstructor(Properties.class).newInstance(properties);
 -
 -    Constructor<P> proxyProcConstructor = 
proxyProcClass.getConstructor(proxyIfaceClass);
 -    P processor = proxyProcConstructor.newInstance(RpcWrapper.service(impl, 
proxyProcConstructor.newInstance(impl)));
 -
 -    TNonblockingServerSocket socket = new TNonblockingServerSocket(port);
 -    THsHaServer.Args args = new THsHaServer.Args(socket);
 -    args.processor(processor);
 -    final long maxFrameSize = 
AccumuloConfiguration.getMemoryInBytes(properties.getProperty("maxFrameSize", 
"16M"));
 -    if (maxFrameSize > Integer.MAX_VALUE)
 -      throw new RuntimeException(maxFrameSize + " is larger than MAX_INT");
 -    args.transportFactory(new TFramedTransport.Factory((int) maxFrameSize));
 -    args.protocolFactory(protoClass.newInstance());
 -    args.maxReadBufferBytes = maxFrameSize;
 -    return new THsHaServer(args);
 +  public static ServerAddress createProxyServer(HostAndPort address, 
TProtocolFactory protocolFactory, Properties properties, ClientConfiguration 
clientConf)
 +      throws Exception {
 +    final int numThreads = 
Integer.parseInt(properties.getProperty(THRIFT_THREAD_POOL_SIZE_KEY, 
THRIFT_THREAD_POOL_SIZE_DEFAULT));
 +    final long maxFrameSize = 
AccumuloConfiguration.getMemoryInBytes(properties.getProperty(THRIFT_MAX_FRAME_SIZE_KEY,
 THRIFT_MAX_FRAME_SIZE_DEFAULT));
 +    final int simpleTimerThreadpoolSize = 
Integer.parseInt(Property.GENERAL_SIMPLETIMER_THREADPOOL_SIZE.getDefaultValue());
 +    // How frequently to try to resize the thread pool
 +    final long threadpoolResizeInterval = 1000l * 5;
 +    // No timeout
 +    final long serverSocketTimeout = 0l;
 +    // Use the new hadoop metrics2 support
 +    final MetricsFactory metricsFactory = new MetricsFactory(false);
 +    final String serverName = "Proxy", threadName = "Accumulo Thrift Proxy";
 +
 +    // create the implementation of the proxy interface
 +    ProxyServer impl = new ProxyServer(properties);
 +
 +    // Wrap the implementation -- translate some exceptions
-     AccumuloProxy.Iface wrappedImpl = RpcWrapper.service(impl, new 
AccumuloProxy.Processor<AccumuloProxy.Iface>(impl).getProcessMapView());
++    AccumuloProxy.Iface wrappedImpl = RpcWrapper.service(impl, new 
AccumuloProxy.Processor<AccumuloProxy.Iface>(impl));
 +
 +    // Create the processor from the implementation
 +    TProcessor processor = new 
AccumuloProxy.Processor<AccumuloProxy.Iface>(wrappedImpl);
 +
 +    // Get the type of thrift server to instantiate
 +    final String serverTypeStr = properties.getProperty(THRIFT_SERVER_TYPE, 
THRIFT_SERVER_TYPE_DEFAULT);
 +    ThriftServerType serverType = DEFAULT_SERVER_TYPE;
 +    if (!THRIFT_SERVER_TYPE_DEFAULT.equals(serverTypeStr)) {
 +      serverType = ThriftServerType.get(serverTypeStr);
 +    }
 +
 +    SslConnectionParams sslParams = null;
 +    SaslServerConnectionParams saslParams = null;
 +    switch (serverType) {
 +      case SSL:
 +        sslParams = 
SslConnectionParams.forClient(ClientContext.convertClientConfig(clientConf));
 +        break;
 +      case SASL:
 +        if 
(!clientConf.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), 
false)) {
 +          // ACCUMULO-3651 Changed level to error and added FATAL to message 
for slf4j capability
 +          log.error("FATAL: SASL thrift server was requested but it is 
disabled in client configuration");
 +          throw new RuntimeException("SASL is not enabled in configuration");
 +        }
 +
 +        // Kerberos needs to be enabled to use it
 +        if (!UserGroupInformation.isSecurityEnabled()) {
 +          // ACCUMULO-3651 Changed level to error and added FATAL to message 
for slf4j capability
 +          log.error("FATAL: Hadoop security is not enabled");
 +          throw new RuntimeException();
 +        }
 +
 +        // Login via principal and keytab
 +        final String kerberosPrincipal = 
properties.getProperty(KERBEROS_PRINCIPAL, ""),
 +        kerberosKeytab = properties.getProperty(KERBEROS_KEYTAB, "");
 +        if (StringUtils.isBlank(kerberosPrincipal) || 
StringUtils.isBlank(kerberosKeytab)) {
 +          // ACCUMULO-3651 Changed level to error and added FATAL to message 
for slf4j capability
 +          log.error("FATAL: Kerberos principal and keytab must be provided");
 +          throw new RuntimeException();
 +        }
 +        UserGroupInformation.loginUserFromKeytab(kerberosPrincipal, 
kerberosKeytab);
 +        UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
 +        log.info("Logged in as " + ugi.getUserName());
 +
 +        // The kerberosPrimary set in the SASL server needs to match the 
principal we're logged in as.
 +        final String shortName = ugi.getShortUserName();
 +        log.info("Setting server primary to {}", shortName);
 +        clientConf.setProperty(ClientProperty.KERBEROS_SERVER_PRIMARY, 
shortName);
 +
 +        KerberosToken token = new KerberosToken();
 +        saslParams = new SaslServerConnectionParams(clientConf, token, null);
 +
 +        processor = new UGIAssumingProcessor(processor);
 +
 +        break;
 +      default:
 +        // nothing to do -- no extra configuration necessary
 +        break;
 +    }
 +
 +    // Hook up support for tracing for thrift calls
 +    TimedProcessor timedProcessor = new TimedProcessor(metricsFactory, 
processor, serverName, threadName);
 +
 +    // Create the thrift server with our processor and properties
 +    ServerAddress serverAddr = TServerUtils.startTServer(address, serverType, 
timedProcessor, protocolFactory, serverName, threadName, numThreads,
 +        simpleTimerThreadpoolSize, threadpoolResizeInterval, maxFrameSize, 
sslParams, saslParams, serverSocketTimeout);
 +
 +    return serverAddr;
    }
  }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0e3af1e8/server/base/src/main/java/org/apache/accumulo/server/rpc/RpcWrapper.java
----------------------------------------------------------------------
diff --cc 
server/base/src/main/java/org/apache/accumulo/server/rpc/RpcWrapper.java
index 585eb27,0000000..ec68166
mode 100644,000000..100644
--- a/server/base/src/main/java/org/apache/accumulo/server/rpc/RpcWrapper.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/RpcWrapper.java
@@@ -1,128 -1,0 +1,128 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.server.rpc;
 +
 +import java.lang.reflect.InvocationHandler;
 +import java.lang.reflect.Method;
 +import java.lang.reflect.Proxy;
 +import java.util.HashSet;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.Set;
 +
 +import org.apache.accumulo.core.trace.wrappers.RpcServerInvocationHandler;
 +import org.apache.accumulo.core.trace.wrappers.TraceWrap;
 +import org.apache.thrift.ProcessFunction;
 +import org.apache.thrift.TApplicationException;
- import org.apache.thrift.TBase;
++import org.apache.thrift.TBaseProcessor;
 +import org.apache.thrift.TException;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +/**
 + * This class accommodates the changes in THRIFT-1805, which appeared in 
Thrift 0.9.1 and restricts client-side notification of server-side errors to
 + * {@link TException} only, by wrapping {@link RuntimeException} and {@link 
Error} as {@link TException}, so it doesn't just close the connection and look 
like
 + * a network issue, but informs the client that a {@link 
TApplicationException} had occurred, as it did in Thrift 0.9.0. This performs 
similar functions as
 + * {@link TraceWrap}, but with the additional action of translating 
exceptions. See also ACCUMULO-1691 and ACCUMULO-2950.
 + *
 + * ACCUMULO-4065 found that the above exception-wrapping is not appropriate 
for Thrift's implementation of oneway methods. Oneway methods are defined as a
 + * method which the client does not wait for it to return. Normally, this is 
acceptable as these methods are void. Therefore, if another client reuses the
 + * connection to send a new RPC, there is no "extra" data sitting on the 
InputStream from the Socket (that the server sent). However, the implementation 
of a
 + * oneway method <em>does</em> send a response to the client when the 
implementation throws a {@link TException}. This message showing up on the 
client's
 + * InputStream causes future use of the Thrift Connection to become unusable. 
As long as the Thrift implementation sends a message back when oneway methods
 + * throw a {@link TException}, we much make sure that we don't 
re-wrap-and-throw any exceptions as {@link TException}s.
 + *
 + * @since 1.6.1
 + */
 +public class RpcWrapper {
 +  private static final Logger log = LoggerFactory.getLogger(RpcWrapper.class);
 +
-   public static <T> T service(final T instance, @SuppressWarnings("rawtypes") 
final Map<String,ProcessFunction<T,? extends TBase>> processorView) {
++  public static <I> I service(final I instance, final TBaseProcessor<I> 
processor) {
++    final Map<String,ProcessFunction<I,?>> processorView = 
processor.getProcessMapView();
 +    final Set<String> onewayMethods = getOnewayMethods(processorView);
 +    log.debug("Found oneway Thrift methods: " + onewayMethods);
 +
 +    InvocationHandler handler = getInvocationHandler(instance, onewayMethods);
 +
 +    @SuppressWarnings("unchecked")
-     T proxiedInstance = (T) 
Proxy.newProxyInstance(instance.getClass().getClassLoader(), 
instance.getClass().getInterfaces(), handler);
++    I proxiedInstance = (I) 
Proxy.newProxyInstance(instance.getClass().getClassLoader(), 
instance.getClass().getInterfaces(), handler);
 +    return proxiedInstance;
 +  }
 +
 +  protected static <T> RpcServerInvocationHandler<T> 
getInvocationHandler(final T instance, final Set<String> onewayMethods) {
 +    return new RpcServerInvocationHandler<T>(instance) {
 +      private final Logger log = LoggerFactory.getLogger(instance.getClass());
 +
 +      @Override
 +      public Object invoke(Object obj, Method method, Object[] args) throws 
Throwable {
 +        // e.g. ThriftClientHandler.flush(TInfo, TCredentials, ...)
 +        try {
 +          return super.invoke(obj, method, args);
 +        } catch (RuntimeException e) {
 +          String msg = e.getMessage();
 +          log.error(msg, e);
 +          if (onewayMethods.contains(method.getName())) {
 +            throw e;
 +          }
 +          throw new TException(msg);
 +        } catch (Error e) {
 +          String msg = e.getMessage();
 +          log.error(msg, e);
 +          if (onewayMethods.contains(method.getName())) {
 +            throw e;
 +          }
 +          throw new TException(msg);
 +        }
 +      }
 +    };
 +  }
 +
-   protected static <T> Set<String> 
getOnewayMethods(@SuppressWarnings("rawtypes") Map<String,ProcessFunction<T,? 
extends TBase>> processorView) {
++  protected static Set<String> getOnewayMethods(Map<String,?> processorView) {
 +    // Get a handle on the isOnewayMethod and make it accessible
 +    final Method isOnewayMethod;
 +    try {
 +      isOnewayMethod = ProcessFunction.class.getDeclaredMethod("isOneway");
 +    } catch (NoSuchMethodException e) {
 +      throw new RuntimeException("Could not access isOneway method", e);
 +    } catch (SecurityException e) {
 +      throw new RuntimeException("Could not access isOneway method", e);
 +    }
 +    // In java7, this appears to be copying the method, but it's trivial for 
us to return the object to how it was before.
 +    final boolean accessible = isOnewayMethod.isAccessible();
 +    isOnewayMethod.setAccessible(true);
 +
 +    try {
 +      final Set<String> onewayMethods = new HashSet<String>();
-       for (@SuppressWarnings("rawtypes")
-       Entry<String,ProcessFunction<T,? extends TBase>> entry : 
processorView.entrySet()) {
++      for (Entry<String,?> entry : processorView.entrySet()) {
 +        try {
 +          if ((Boolean) isOnewayMethod.invoke(entry.getValue())) {
 +            onewayMethods.add(entry.getKey());
 +          }
 +        } catch (RuntimeException e) {
 +          throw e;
 +        } catch (Exception e) {
 +          throw new RuntimeException(e);
 +        }
 +      }
 +
 +      return onewayMethods;
 +    } finally {
 +      // Reset it back to how it was.
 +      isOnewayMethod.setAccessible(accessible);
 +    }
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0e3af1e8/server/base/src/test/java/org/apache/accumulo/server/rpc/RpcWrapperTest.java
----------------------------------------------------------------------
diff --cc 
server/base/src/test/java/org/apache/accumulo/server/rpc/RpcWrapperTest.java
index e4e9402,0000000..39d3705
mode 100644,000000..100644
--- 
a/server/base/src/test/java/org/apache/accumulo/server/rpc/RpcWrapperTest.java
+++ 
b/server/base/src/test/java/org/apache/accumulo/server/rpc/RpcWrapperTest.java
@@@ -1,304 -1,0 +1,301 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to you under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + * http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.server.rpc;
 +
 +import java.util.Collections;
 +import java.util.HashMap;
 +import java.util.Map;
 +import java.util.Set;
 +
 +import org.apache.accumulo.core.trace.wrappers.RpcServerInvocationHandler;
 +import org.apache.accumulo.server.rpc.RpcWrapper;
 +import org.apache.thrift.ProcessFunction;
 +import org.apache.thrift.TBase;
 +import org.apache.thrift.TException;
 +import org.apache.thrift.protocol.TProtocol;
 +import org.junit.Assert;
 +import org.junit.Test;
 +
 +import com.google.common.collect.Sets;
 +
 +/**
 + * Verification that RpcWrapper correctly mangles Exceptions to work around 
Thrift.
 + */
 +public class RpcWrapperTest {
 +
 +  private static final String RTE_MESSAGE = "RpcWrapperTest's 
RuntimeException Message";
 +
 +  /**
 +   * Given a method name and whether or not the method is oneway, construct a 
ProcessFunction.
 +   *
 +   * @param methodName
 +   *          The service method name.
 +   * @param isOneway
 +   *          Is the method oneway.
 +   * @return A ProcessFunction.
 +   */
 +  private fake_proc<FakeService> createProcessFunction(String methodName, 
boolean isOneway) {
 +    return new fake_proc<FakeService>(methodName, isOneway);
 +  }
 +
 +  @Test
 +  public void testSomeOnewayMethods() {
-     @SuppressWarnings("rawtypes")
-     Map<String,ProcessFunction<FakeService,? extends TBase>> procs = new 
HashMap<String,ProcessFunction<FakeService,? extends TBase>>();
++    Map<String,ProcessFunction<FakeService,?>> procs = new 
HashMap<String,ProcessFunction<FakeService,?>>();
 +    procs.put("foo", createProcessFunction("foo", true));
 +    procs.put("foobar", createProcessFunction("foobar", false));
 +    procs.put("bar", createProcessFunction("bar", true));
 +    procs.put("barfoo", createProcessFunction("barfoo", false));
 +
 +    Set<String> onewayMethods = RpcWrapper.getOnewayMethods(procs);
 +    Assert.assertEquals(Sets.newHashSet("foo", "bar"), onewayMethods);
 +  }
 +
 +  @Test
 +  public void testNoOnewayMethods() {
-     @SuppressWarnings("rawtypes")
-     Map<String,ProcessFunction<FakeService,? extends TBase>> procs = new 
HashMap<String,ProcessFunction<FakeService,? extends TBase>>();
++    Map<String,ProcessFunction<FakeService,?>> procs = new 
HashMap<String,ProcessFunction<FakeService,?>>();
 +    procs.put("foo", createProcessFunction("foo", false));
 +    procs.put("foobar", createProcessFunction("foobar", false));
 +    procs.put("bar", createProcessFunction("bar", false));
 +    procs.put("barfoo", createProcessFunction("barfoo", false));
 +
 +    Set<String> onewayMethods = RpcWrapper.getOnewayMethods(procs);
 +    Assert.assertEquals(Collections.<String> emptySet(), onewayMethods);
 +  }
 +
 +  @Test
 +  public void testAllOnewayMethods() {
-     @SuppressWarnings("rawtypes")
-     Map<String,ProcessFunction<FakeService,? extends TBase>> procs = new 
HashMap<String,ProcessFunction<FakeService,? extends TBase>>();
++    Map<String,ProcessFunction<FakeService,?>> procs = new 
HashMap<String,ProcessFunction<FakeService,?>>();
 +    procs.put("foo", createProcessFunction("foo", true));
 +    procs.put("foobar", createProcessFunction("foobar", true));
 +    procs.put("bar", createProcessFunction("bar", true));
 +    procs.put("barfoo", createProcessFunction("barfoo", true));
 +
 +    Set<String> onewayMethods = RpcWrapper.getOnewayMethods(procs);
 +    Assert.assertEquals(Sets.newHashSet("foo", "foobar", "bar", "barfoo"), 
onewayMethods);
 +  }
 +
 +  @Test
 +  public void testNoExceptionWrappingForOneway() throws Throwable {
 +    final Object[] args = new Object[0];
 +
 +    final FakeService impl = new FakeServiceImpl();
 +
 +    // "short" names throw RTEs and are oneway, while long names do not throw 
exceptions and are not oneway.
 +    RpcServerInvocationHandler<FakeService> handler = 
RpcWrapper.getInvocationHandler(impl, Sets.newHashSet("foo", "bar"));
 +
 +    // Should throw an exception, but not be wrapped because the method is 
oneway
 +    try {
 +      handler.invoke(impl, FakeServiceImpl.class.getMethod("foo"), args);
 +      Assert.fail("Expected an exception");
 +    } catch (RuntimeException e) {
 +      Assert.assertEquals(RTE_MESSAGE, e.getMessage());
 +    }
 +
 +    // Should not throw an exception
 +    handler.invoke(impl, FakeServiceImpl.class.getMethod("foobar"), args);
 +  }
 +
 +  @Test
 +  public void testExceptionWrappingForNonOneway() throws Throwable {
 +    final Object[] args = new Object[0];
 +
 +    final FakeService impl = new FakeServiceImpl();
 +
 +    // "short" names throw RTEs and are not oneway, while long names do not 
throw exceptions and are oneway.
 +    RpcServerInvocationHandler<FakeService> handler = 
RpcWrapper.getInvocationHandler(impl, Sets.newHashSet("foobar", "barfoo"));
 +
 +    // Should throw an exception, but not be wrapped because the method is 
oneway
 +    try {
 +      handler.invoke(impl, FakeServiceImpl.class.getMethod("foo"), args);
 +      Assert.fail("Expected an exception");
 +    } catch (TException e) {
 +      // The InvocationHandler should take the exception from the RTE and 
make it a TException
 +      Assert.assertEquals(RTE_MESSAGE, e.getMessage());
 +    }
 +
 +    // Should not throw an exception
 +    handler.invoke(impl, FakeServiceImpl.class.getMethod("foobar"), args);
 +  }
 +
 +  //
 +  // Some hacked together classes/interfaces that mimic what Thrift is doing.
 +  //
 +
 +  /**
 +   * Some fake fields for our fake arguments.
 +   */
 +  private static class fake_fields implements org.apache.thrift.TFieldIdEnum {
 +    @Override
 +    public short getThriftFieldId() {
 +      throw new UnsupportedOperationException();
 +    }
 +
 +    @Override
 +    public String getFieldName() {
 +      throw new UnsupportedOperationException();
 +    }
 +
 +    @Override
 +    public boolean equals(Object o) {
 +      throw new UnsupportedOperationException();
 +    }
 +
 +    @Override
 +    public int hashCode() {
 +      throw new UnsupportedOperationException();
 +    }
 +  }
 +
 +  /**
 +   * A fake thrift service
 +   */
 +  interface FakeService {
 +    void foo();
 +
 +    String foobar();
 +
 +    int bar();
 +
 +    long barfoo();
 +  }
 +
 +  /**
 +   * An implementation of the fake thrift service. The "short" names throw 
RTEs, while long names do not.
 +   */
 +  public static class FakeServiceImpl implements FakeService {
 +    @Override
 +    public void foo() {
 +      throw new RuntimeException(RTE_MESSAGE);
 +    }
 +
 +    @Override
 +    public String foobar() {
 +      return "";
 +    }
 +
 +    @Override
 +    public int bar() {
 +      throw new RuntimeException(RTE_MESSAGE);
 +    }
 +
 +    @Override
 +    public long barfoo() {
 +      return 0;
 +    }
 +  };
 +
 +  /**
 +   * A fake ProcessFunction implementation for testing that allows injection 
of method name and oneway.
 +   */
 +  private static class fake_proc<I extends FakeService> extends 
org.apache.thrift.ProcessFunction<I,foo_args> {
 +    final private boolean isOneway;
 +
 +    public fake_proc(String methodName, boolean isOneway) {
 +      super(methodName);
 +      this.isOneway = isOneway;
 +    }
 +
 +    @Override
 +    protected boolean isOneway() {
 +      return isOneway;
 +    }
 +
 +    @SuppressWarnings("rawtypes")
 +    @Override
 +    public TBase getResult(I iface, foo_args args) throws TException {
 +      throw new UnsupportedOperationException();
 +    }
 +
 +    @Override
 +    public foo_args getEmptyArgsInstance() {
 +      throw new UnsupportedOperationException();
 +    }
 +
 +    @Override
 +    public boolean equals(Object o) {
 +      throw new UnsupportedOperationException();
 +    }
 +
 +    @Override
 +    public int hashCode() {
 +      throw new UnsupportedOperationException();
 +    }
 +  }
 +
 +  /**
 +   * Fake arguments for our fake service.
 +   */
 +  private static class foo_args implements 
org.apache.thrift.TBase<foo_args,fake_fields> {
 +
 +    private static final long serialVersionUID = 1L;
 +
 +    @Override
 +    public boolean equals(Object o) {
 +      throw new UnsupportedOperationException();
 +    }
 +
 +    @Override
 +    public int hashCode() {
 +      throw new UnsupportedOperationException();
 +    }
 +
 +    @Override
 +    public int compareTo(foo_args o) {
 +      throw new UnsupportedOperationException();
 +    }
 +
 +    @Override
 +    public void read(TProtocol iprot) throws TException {
 +      throw new UnsupportedOperationException();
 +    }
 +
 +    @Override
 +    public void write(TProtocol oprot) throws TException {
 +      throw new UnsupportedOperationException();
 +    }
 +
 +    @Override
 +    public fake_fields fieldForId(int fieldId) {
 +      throw new UnsupportedOperationException();
 +    }
 +
 +    @Override
 +    public boolean isSet(fake_fields field) {
 +      throw new UnsupportedOperationException();
 +    }
 +
 +    @Override
 +    public Object getFieldValue(fake_fields field) {
 +      throw new UnsupportedOperationException();
 +    }
 +
 +    @Override
 +    public void setFieldValue(fake_fields field, Object value) {
 +      throw new UnsupportedOperationException();
 +    }
 +
 +    @Override
 +    public TBase<foo_args,fake_fields> deepCopy() {
 +      throw new UnsupportedOperationException();
 +    }
 +
 +    @Override
 +    public void clear() {
 +      throw new UnsupportedOperationException();
 +    }
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0e3af1e8/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
----------------------------------------------------------------------
diff --cc 
server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
index 4d55461,1c6836b..6bb8f9a
--- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
@@@ -709,16 -702,9 +709,16 @@@ public class SimpleGarbageCollector ext
    }
  
    private HostAndPort startStatsService() throws UnknownHostException {
-     Iface rpcProxy = RpcWrapper.service(this, new 
Processor<Iface>(this).getProcessMapView());
 -    Processor<Iface> processor = new 
Processor<Iface>(RpcWrapper.service(this, new Processor<Iface>(this)));
 -    int port = config.getPort(Property.GC_PORT);
 -    long maxMessageSize = 
config.getMemoryInBytes(Property.GENERAL_MAX_MESSAGE_SIZE);
++    Iface rpcProxy = RpcWrapper.service(this, new Processor<Iface>(this));
 +    final Processor<Iface> processor;
 +    if (ThriftServerType.SASL == getThriftServerType()) {
 +      Iface tcProxy = TCredentialsUpdatingWrapper.service(rpcProxy, 
getClass(), getConfiguration());
 +      processor = new Processor<Iface>(tcProxy);
 +    } else {
 +      processor = new Processor<Iface>(rpcProxy);
 +    }
 +    int port = getConfiguration().getPort(Property.GC_PORT);
 +    long maxMessageSize = 
getConfiguration().getMemoryInBytes(Property.GENERAL_MAX_MESSAGE_SIZE);
      HostAndPort result = HostAndPort.fromParts(opts.getAddress(), port);
      log.debug("Starting garbage collector listening on " + result);
      try {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0e3af1e8/server/master/src/main/java/org/apache/accumulo/master/Master.java
----------------------------------------------------------------------
diff --cc server/master/src/main/java/org/apache/accumulo/master/Master.java
index 02e1132,0434714..1370989
--- a/server/master/src/main/java/org/apache/accumulo/master/Master.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java
@@@ -1147,38 -1032,10 +1147,38 @@@ public class Master extends AccumuloSer
        throw new IOException(e);
      }
  
 -    Processor<Iface> processor = new Processor<Iface>(
 -        RpcWrapper.service(new MasterClientServiceHandler(this), new 
Processor<Iface>(new MasterClientServiceHandler(this))));
 -    ServerAddress sa = TServerUtils.startServer(getSystemConfiguration(), 
hostname, Property.MASTER_CLIENTPORT, processor, "Master",
 -        "Master Client Service Handler", null, Property.MASTER_MINTHREADS, 
Property.MASTER_THREADCHECK, Property.GENERAL_MAX_MESSAGE_SIZE);
 +    ZooKeeperInitialization.ensureZooKeeperInitialized(zReaderWriter, zroot);
 +
 +    // Make sure that we have a secret key (either a new one or an old one 
from ZK) before we start
 +    // the master client service.
 +    if (null != authenticationTokenKeyManager && null != keyDistributor) {
 +      log.info("Starting delegation-token key manager");
 +      keyDistributor.initialize();
 +      authenticationTokenKeyManager.start();
 +      boolean logged = false;
 +      while (!authenticationTokenKeyManager.isInitialized()) {
 +        // Print out a status message when we start waiting for the key 
manager to get initialized
 +        if (!logged) {
 +          log.info("Waiting for AuthenticationTokenKeyManager to be 
initialized");
 +          logged = true;
 +        }
 +        UtilWaitThread.sleep(200);
 +      }
 +      // And log when we are initialized
 +      log.info("AuthenticationTokenSecretManager is initialized");
 +    }
 +
 +    clientHandler = new MasterClientServiceHandler(this);
-     Iface rpcProxy = RpcWrapper.service(clientHandler, new 
Processor<Iface>(clientHandler).getProcessMapView());
++    Iface rpcProxy = RpcWrapper.service(clientHandler, new 
Processor<Iface>(clientHandler));
 +    final Processor<Iface> processor;
 +    if (ThriftServerType.SASL == getThriftServerType()) {
 +      Iface tcredsProxy = TCredentialsUpdatingWrapper.service(rpcProxy, 
clientHandler.getClass(), getConfiguration());
 +      processor = new Processor<Iface>(tcredsProxy);
 +    } else {
 +      processor = new Processor<Iface>(rpcProxy);
 +    }
 +    ServerAddress sa = TServerUtils.startServer(this, hostname, 
Property.MASTER_CLIENTPORT, processor, "Master", "Master Client Service 
Handler", null,
 +        Property.MASTER_MINTHREADS, Property.MASTER_THREADCHECK, 
Property.GENERAL_MAX_MESSAGE_SIZE);
      clientService = sa.server;
      String address = sa.address.toString();
      log.info("Setting master lock data to " + address);
@@@ -1187,43 -1044,6 +1187,43 @@@
      while (!clientService.isServing()) {
        UtilWaitThread.sleep(100);
      }
 +
 +    // Start the daemon to scan the replication table and make units of work
 +    replicationWorkDriver = new ReplicationDriver(this);
 +    replicationWorkDriver.start();
 +
 +    // Start the daemon to assign work to tservers to replicate to our peers
 +    try {
 +      replicationWorkAssigner = new WorkDriver(this);
 +    } catch (AccumuloException | AccumuloSecurityException e) {
 +      log.error("Caught exception trying to initialize replication 
WorkDriver", e);
 +      throw new RuntimeException(e);
 +    }
 +    replicationWorkAssigner.start();
 +
 +    // Start the replication coordinator which assigns tservers to service 
replication requests
 +    MasterReplicationCoordinator impl = new 
MasterReplicationCoordinator(this);
 +    ReplicationCoordinator.Processor<ReplicationCoordinator.Iface> 
replicationCoordinatorProcessor = new 
ReplicationCoordinator.Processor<ReplicationCoordinator.Iface>(
-         RpcWrapper.service(impl, new 
ReplicationCoordinator.Processor<ReplicationCoordinator.Iface>(impl).getProcessMapView()));
++        RpcWrapper.service(impl, new 
ReplicationCoordinator.Processor<ReplicationCoordinator.Iface>(impl)));
 +    ServerAddress replAddress = TServerUtils.startServer(this, hostname, 
Property.MASTER_REPLICATION_COORDINATOR_PORT, replicationCoordinatorProcessor,
 +        "Master Replication Coordinator", "Replication Coordinator", null, 
Property.MASTER_REPLICATION_COORDINATOR_MINTHREADS,
 +        Property.MASTER_REPLICATION_COORDINATOR_THREADCHECK, 
Property.GENERAL_MAX_MESSAGE_SIZE);
 +
 +    log.info("Started replication coordinator service at " + 
replAddress.address);
 +
 +    // Advertise that port we used so peers don't have to be told what it is
 +    
ZooReaderWriter.getInstance().putPersistentData(ZooUtil.getRoot(getInstance()) 
+ Constants.ZMASTER_REPLICATION_COORDINATOR_ADDR,
 +        replAddress.address.toString().getBytes(UTF_8), 
NodeExistsPolicy.OVERWRITE);
 +
 +    // Register replication metrics
 +    MasterMetricsFactory factory = new 
MasterMetricsFactory(getConfiguration(), this);
 +    Metrics replicationMetrics = factory.createReplicationMetrics();
 +    try {
 +      replicationMetrics.register();
 +    } catch (Exception e) {
 +      log.error("Failed to register replication metrics", e);
 +    }
 +
      while (clientService.isServing()) {
        UtilWaitThread.sleep(500);
      }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0e3af1e8/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --cc 
server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 3022a76,dce66b0..1080d8d
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@@ -2332,46 -3160,14 +2332,46 @@@ public class TabletServer extends Accum
    private HostAndPort startTabletClientService() throws UnknownHostException {
      // start listening for client connection last
      ThriftClientHandler handler = new ThriftClientHandler();
-     Iface rpcProxy = RpcWrapper.service(handler, new 
Processor<Iface>(handler).getProcessMapView());
 -    Iface tch = RpcWrapper.service(handler, new Processor<Iface>(handler));
 -    Processor<Iface> processor = new Processor<Iface>(tch);
 -    HostAndPort address = startServer(getSystemConfiguration(), 
clientAddress.getHostText(), Property.TSERV_CLIENTPORT, processor, "Thrift 
Client Server");
++    Iface rpcProxy = RpcWrapper.service(handler, new 
Processor<Iface>(handler));
 +    final Processor<Iface> processor;
 +    if (ThriftServerType.SASL == getThriftServerType()) {
 +      Iface tcredProxy = TCredentialsUpdatingWrapper.service(rpcProxy, 
ThriftClientHandler.class, getConfiguration());
 +      processor = new Processor<Iface>(tcredProxy);
 +    } else {
 +      processor = new Processor<Iface>(rpcProxy);
 +    }
 +    HostAndPort address = 
startServer(getServerConfigurationFactory().getConfiguration(), 
clientAddress.getHostText(), Property.TSERV_CLIENTPORT, processor,
 +        "Thrift Client Server");
      log.info("address = " + address);
      return address;
    }
  
 -  ZooLock getLock() {
 +  private HostAndPort startReplicationService() throws UnknownHostException {
 +    final ReplicationServicerHandler handler = new 
ReplicationServicerHandler(this);
-     ReplicationServicer.Iface rpcProxy = RpcWrapper.service(handler, new 
ReplicationServicer.Processor<ReplicationServicer.Iface>(handler).getProcessMapView());
++    ReplicationServicer.Iface rpcProxy = RpcWrapper.service(handler, new 
ReplicationServicer.Processor<ReplicationServicer.Iface>(handler));
 +    ReplicationServicer.Iface repl = 
TCredentialsUpdatingWrapper.service(rpcProxy, handler.getClass(), 
getConfiguration());
 +    ReplicationServicer.Processor<ReplicationServicer.Iface> processor = new 
ReplicationServicer.Processor<ReplicationServicer.Iface>(repl);
 +    AccumuloConfiguration conf = 
getServerConfigurationFactory().getConfiguration();
 +    Property maxMessageSizeProperty = 
(conf.get(Property.TSERV_MAX_MESSAGE_SIZE) != null ? 
Property.TSERV_MAX_MESSAGE_SIZE : Property.GENERAL_MAX_MESSAGE_SIZE);
 +    ServerAddress sp = TServerUtils.startServer(this, 
clientAddress.getHostText(), Property.REPLICATION_RECEIPT_SERVICE_PORT, 
processor,
 +        "ReplicationServicerHandler", "Replication Servicer", null, 
Property.REPLICATION_MIN_THREADS, Property.REPLICATION_THREADCHECK, 
maxMessageSizeProperty);
 +    this.replServer = sp.server;
 +    log.info("Started replication service on " + sp.address);
 +
 +    try {
 +      // The replication service is unique to the thrift service for a 
tserver, not just a host.
 +      // Advertise the host and port for replication service given the host 
and port for the tserver.
 +      
ZooReaderWriter.getInstance().putPersistentData(ZooUtil.getRoot(getInstance()) 
+ ReplicationConstants.ZOO_TSERVERS + "/" + clientAddress.toString(),
 +          sp.address.toString().getBytes(UTF_8), NodeExistsPolicy.OVERWRITE);
 +    } catch (Exception e) {
 +      log.error("Could not advertise replication service port", e);
 +      throw new RuntimeException(e);
 +    }
 +
 +    return sp.address;
 +  }
 +
 +  public ZooLock getLock() {
      return tabletServerLock;
    }
  

Reply via email to