Merge branch '1.6' into 1.7

Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/462dfbc3
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/462dfbc3
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/462dfbc3

Branch: refs/heads/1.8
Commit: 462dfbc3b0e323552f33d185d63daa8a1833d6d7
Parents: 4dabc2b f99ac9b
Author: Josh Elser <els...@apache.org>
Authored: Thu Jun 9 18:40:34 2016 -0400
Committer: Josh Elser <els...@apache.org>
Committed: Thu Jun 9 18:40:34 2016 -0400

----------------------------------------------------------------------
 .../main/java/org/apache/accumulo/server/rpc/TServerUtils.java   | 2 +-
 .../main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java | 2 +-
 .../src/main/java/org/apache/accumulo/tserver/TabletServer.java  | 4 ++--
 .../java/org/apache/accumulo/tserver/log/TabletServerLogger.java | 2 +-
 4 files changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/462dfbc3/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
----------------------------------------------------------------------
diff --cc 
server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
index 77d6412,0000000..08ef944
mode 100644,000000..100644
--- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
@@@ -1,577 -1,0 +1,577 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.server.rpc;
 +
 +import static com.google.common.base.Preconditions.checkArgument;
 +
 +import java.io.IOException;
 +import java.lang.reflect.Field;
 +import java.net.BindException;
 +import java.net.InetAddress;
 +import java.net.InetSocketAddress;
 +import java.net.ServerSocket;
 +import java.net.UnknownHostException;
 +import java.util.Arrays;
 +import java.util.HashSet;
 +import java.util.Random;
 +import java.util.Set;
 +import java.util.concurrent.ExecutorService;
 +import java.util.concurrent.ThreadPoolExecutor;
 +
 +import javax.net.ssl.SSLServerSocket;
 +
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.rpc.SslConnectionParams;
 +import org.apache.accumulo.core.rpc.ThriftUtil;
 +import org.apache.accumulo.core.rpc.UGIAssumingTransportFactory;
 +import org.apache.accumulo.core.util.Daemon;
 +import org.apache.accumulo.core.util.SimpleThreadPool;
 +import org.apache.accumulo.core.util.UtilWaitThread;
 +import org.apache.accumulo.fate.util.LoggingRunnable;
 +import org.apache.accumulo.server.AccumuloServerContext;
 +import org.apache.accumulo.server.util.Halt;
 +import org.apache.accumulo.server.util.time.SimpleTimer;
 +import org.apache.hadoop.security.SaslRpcServer;
 +import org.apache.hadoop.security.UserGroupInformation;
 +import org.apache.thrift.TProcessor;
 +import org.apache.thrift.TProcessorFactory;
 +import org.apache.thrift.protocol.TProtocolFactory;
 +import org.apache.thrift.server.TServer;
 +import org.apache.thrift.server.TThreadPoolServer;
 +import org.apache.thrift.transport.TSSLTransportFactory;
 +import org.apache.thrift.transport.TSaslServerTransport;
 +import org.apache.thrift.transport.TServerSocket;
 +import org.apache.thrift.transport.TServerTransport;
 +import org.apache.thrift.transport.TTransportException;
 +import org.apache.thrift.transport.TTransportFactory;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import com.google.common.net.HostAndPort;
 +
 +/**
 + * Factory methods for creating Thrift server objects
 + */
 +public class TServerUtils {
 +  private static final Logger log = 
LoggerFactory.getLogger(TServerUtils.class);
 +
 +  /**
 +   * Static instance, passed to {@link ClientInfoProcessorFactory}, which 
will contain the client address of any incoming RPC.
 +   */
 +  public static final ThreadLocal<String> clientAddress = new 
ThreadLocal<String>();
 +
 +  /**
 +   * Start a server, at the given port, or higher, if that port is not 
available.
 +   *
 +   * @param service
 +   *          RPC configuration
 +   * @param portHintProperty
 +   *          the port to attempt to open, can be zero, meaning "any 
available port"
 +   * @param processor
 +   *          the service to be started
 +   * @param serverName
 +   *          the name of the class that is providing the service
 +   * @param threadName
 +   *          name this service's thread for better debugging
 +   * @param portSearchProperty
 +   *          A boolean Property to control if port-search should be used, or 
null to disable
 +   * @param minThreadProperty
 +   *          A Property to control the minimum number of threads in the pool
 +   * @param timeBetweenThreadChecksProperty
 +   *          A Property to control the amount of time between checks to 
resize the thread pool
 +   * @param maxMessageSizeProperty
 +   *          A Property to control the maximum Thrift message size accepted
 +   * @return the server object created, and the port actually used
 +   * @throws UnknownHostException
 +   *           when we don't know our own address
 +   */
 +  public static ServerAddress startServer(AccumuloServerContext service, 
String hostname, Property portHintProperty, TProcessor processor, String 
serverName,
 +      String threadName, Property portSearchProperty, Property 
minThreadProperty, Property timeBetweenThreadChecksProperty, Property 
maxMessageSizeProperty)
 +      throws UnknownHostException {
 +    final AccumuloConfiguration config = service.getConfiguration();
 +
 +    final int portHint = config.getPort(portHintProperty);
 +
 +    int minThreads = 2;
 +    if (minThreadProperty != null)
 +      minThreads = config.getCount(minThreadProperty);
 +
 +    long timeBetweenThreadChecks = 1000;
 +    if (timeBetweenThreadChecksProperty != null)
 +      timeBetweenThreadChecks = 
config.getTimeInMillis(timeBetweenThreadChecksProperty);
 +
 +    long maxMessageSize = 10 * 1000 * 1000;
 +    if (maxMessageSizeProperty != null)
 +      maxMessageSize = config.getMemoryInBytes(maxMessageSizeProperty);
 +
 +    boolean portSearch = false;
 +    if (portSearchProperty != null)
 +      portSearch = config.getBoolean(portSearchProperty);
 +
 +    final int simpleTimerThreadpoolSize = 
config.getCount(Property.GENERAL_SIMPLETIMER_THREADPOOL_SIZE);
 +    final ThriftServerType serverType = service.getThriftServerType();
 +
 +    if (ThriftServerType.SASL == serverType) {
 +      processor = updateSaslProcessor(serverType, processor);
 +    }
 +
 +    // create the TimedProcessor outside the port search loop so we don't try 
to register the same metrics mbean more than once
 +    TimedProcessor timedProcessor = new TimedProcessor(config, processor, 
serverName, threadName);
 +
 +    Random random = new Random();
 +    for (int j = 0; j < 100; j++) {
 +
 +      // Are we going to slide around, looking for an open port?
 +      int portsToSearch = 1;
 +      if (portSearch)
 +        portsToSearch = 1000;
 +
 +      for (int i = 0; i < portsToSearch; i++) {
 +        int port = portHint + i;
 +        if (portHint != 0 && i > 0)
 +          port = 1024 + random.nextInt(65535 - 1024);
 +        if (port > 65535)
 +          port = 1024 + port % (65535 - 1024);
 +        try {
 +          HostAndPort addr = HostAndPort.fromParts(hostname, port);
 +          return TServerUtils.startTServer(addr, serverType, timedProcessor, 
serverName, threadName, minThreads, simpleTimerThreadpoolSize,
 +              timeBetweenThreadChecks, maxMessageSize, 
service.getServerSslParams(), service.getSaslParams(), 
service.getClientTimeoutInMillis());
 +        } catch (TTransportException ex) {
 +          log.error("Unable to start TServer", ex);
 +          if (ex.getCause() == null || ex.getCause().getClass() == 
BindException.class) {
 +            // Note: with a TNonblockingServerSocket a "port taken" exception 
is a cause-less
 +            // TTransportException, and with a TSocket created by 
TSSLTransportFactory, it
 +            // comes through as caused by a BindException.
 +            log.info("Unable to use port {}, retrying. (Thread Name = {})", 
port, threadName);
 +            UtilWaitThread.sleep(250);
 +          } else {
 +            // thrift is passing up a nested exception that isn't a 
BindException,
 +            // so no reason to believe retrying on a different port would 
help.
 +            log.error("Unable to start TServer", ex);
 +            break;
 +          }
 +        }
 +      }
 +    }
 +    throw new UnknownHostException("Unable to find a listen port");
 +  }
 +
 +  /**
 +   * Create a NonBlockingServer with a custom thread pool that can 
dynamically resize itself.
 +   */
 +  public static ServerAddress createNonBlockingServer(HostAndPort address, 
TProcessor processor, TProtocolFactory protocolFactory, final String serverName,
 +      String threadName, final int numThreads, final int numSTThreads, long 
timeBetweenThreadChecks, long maxMessageSize) throws TTransportException {
 +
 +    final TNonblockingServerSocket transport = new 
TNonblockingServerSocket(new InetSocketAddress(address.getHostText(), 
address.getPort()));
 +    final CustomNonBlockingServer.Args options = new 
CustomNonBlockingServer.Args(transport);
 +
 +    options.protocolFactory(protocolFactory);
 +    options.transportFactory(ThriftUtil.transportFactory(maxMessageSize));
 +    options.maxReadBufferBytes = maxMessageSize;
 +    options.stopTimeoutVal(5);
 +
 +    // Create our own very special thread pool.
 +    ThreadPoolExecutor pool = createSelfResizingThreadPool(serverName, 
numThreads, numSTThreads, timeBetweenThreadChecks);
 +
 +    options.executorService(pool);
 +    options.processorFactory(new TProcessorFactory(processor));
 +
 +    if (address.getPort() == 0) {
 +      address = HostAndPort.fromParts(address.getHostText(), 
transport.getPort());
 +    }
 +
 +    return new ServerAddress(new CustomNonBlockingServer(options), address);
 +  }
 +
 +  /**
 +   * Creates a {@link SimpleThreadPool} which uses {@link SimpleTimer} to 
inspect the core pool size and number of active threads of the
 +   * {@link ThreadPoolExecutor} and increase or decrease the core pool size 
based on activity (excessive or lack thereof).
 +   *
 +   * @param serverName
 +   *          A name to describe the thrift server this executor will service
 +   * @param executorThreads
 +   *          The maximum number of threads for the executor
 +   * @param simpleTimerThreads
 +   *          The numbers of threads used to get the {@link SimpleTimer} 
instance
 +   * @param timeBetweenThreadChecks
 +   *          The amount of time, in millis, between attempts to resize the 
executor thread pool
 +   * @return A {@link ThreadPoolExecutor} which will resize itself 
automatically
 +   */
 +  public static ThreadPoolExecutor createSelfResizingThreadPool(final String 
serverName, final int executorThreads, int simpleTimerThreads,
 +      long timeBetweenThreadChecks) {
 +    final ThreadPoolExecutor pool = new SimpleThreadPool(executorThreads, 
"ClientPool");
 +    // periodically adjust the number of threads we need by checking how busy 
our threads are
 +    SimpleTimer.getInstance(simpleTimerThreads).schedule(new Runnable() {
 +      @Override
 +      public void run() {
 +        // there is a minor race condition between sampling the current state 
of the thread pool and adjusting it
 +        // however, this isn't really an issue, since it adjusts periodically 
anyway
 +        if (pool.getCorePoolSize() <= pool.getActiveCount()) {
 +          int larger = pool.getCorePoolSize() + 
Math.min(pool.getQueue().size(), 2);
 +          log.info("Increasing server thread pool size on {} to {}", 
serverName, larger);
 +          pool.setMaximumPoolSize(larger);
 +          pool.setCorePoolSize(larger);
 +        } else {
 +          if (pool.getCorePoolSize() > pool.getActiveCount() + 3) {
 +            int smaller = Math.max(executorThreads, pool.getCorePoolSize() - 
1);
 +            if (smaller != pool.getCorePoolSize()) {
 +              log.info("Decreasing server thread pool size on {} to {}", 
serverName, smaller);
 +              pool.setCorePoolSize(smaller);
 +            }
 +          }
 +        }
 +      }
 +    }, timeBetweenThreadChecks, timeBetweenThreadChecks);
 +    return pool;
 +  }
 +
 +  /**
 +   * Creates a TTheadPoolServer for normal unsecure operation. Useful for 
comparing performance against SSL or SASL transports.
 +   *
 +   * @param address
 +   *          Address to bind to
 +   * @param processor
 +   *          TProcessor for the server
 +   * @param maxMessageSize
 +   *          Maximum size of a Thrift message allowed
 +   * @return A configured TThreadPoolServer and its bound address information
 +   */
 +  public static ServerAddress createBlockingServer(HostAndPort address, 
TProcessor processor, TProtocolFactory protocolFactory, long maxMessageSize,
 +      String serverName, int numThreads, int numSimpleTimerThreads, long 
timeBetweenThreadChecks) throws TTransportException {
 +
 +    TServerSocket transport = new TServerSocket(address.getPort());
 +    ThreadPoolExecutor pool = createSelfResizingThreadPool(serverName, 
numThreads, numSimpleTimerThreads, timeBetweenThreadChecks);
 +    TThreadPoolServer server = createTThreadPoolServer(transport, processor, 
ThriftUtil.transportFactory(maxMessageSize), protocolFactory, pool);
 +
 +    if (address.getPort() == 0) {
 +      address = HostAndPort.fromParts(address.getHostText(), 
transport.getServerSocket().getLocalPort());
 +    }
 +
 +    return new ServerAddress(server, address);
 +
 +  }
 +
 +  /**
 +   * Create a {@link TThreadPoolServer} with the provided transport, 
processor and transport factory.
 +   *
 +   * @param transport
 +   *          Server transport
 +   * @param processor
 +   *          Processor implementation
 +   * @param transportFactory
 +   *          Transport factory
 +   * @return A configured {@link TThreadPoolServer}
 +   */
 +  public static TThreadPoolServer createTThreadPoolServer(TServerTransport 
transport, TProcessor processor, TTransportFactory transportFactory,
 +      TProtocolFactory protocolFactory) {
 +    return createTThreadPoolServer(transport, processor, transportFactory, 
protocolFactory, null);
 +  }
 +
 +  /**
 +   * Create a {@link TThreadPoolServer} with the provided server transport, 
processor and transport factory.
 +   *
 +   * @param transport
 +   *          TServerTransport for the server
 +   * @param processor
 +   *          TProcessor for the server
 +   * @param transportFactory
 +   *          TTransportFactory for the server
 +   */
 +  public static TThreadPoolServer createTThreadPoolServer(TServerTransport 
transport, TProcessor processor, TTransportFactory transportFactory,
 +      TProtocolFactory protocolFactory, ExecutorService service) {
 +    TThreadPoolServer.Args options = new TThreadPoolServer.Args(transport);
 +    options.protocolFactory(protocolFactory);
 +    options.transportFactory(transportFactory);
 +    options.processorFactory(new ClientInfoProcessorFactory(clientAddress, 
processor));
 +    if (null != service) {
 +      options.executorService(service);
 +    }
 +    return new TThreadPoolServer(options);
 +  }
 +
 +  /**
 +   * Create the Thrift server socket for RPC running over SSL.
 +   *
 +   * @param port
 +   *          Port of the server socket to bind to
 +   * @param timeout
 +   *          Socket timeout
 +   * @param address
 +   *          Address to bind the socket to
 +   * @param params
 +   *          SSL parameters
 +   * @return A configured TServerSocket configured to use SSL
 +   */
 +  public static TServerSocket getSslServerSocket(int port, int timeout, 
InetAddress address, SslConnectionParams params) throws TTransportException {
 +    TServerSocket tServerSock;
 +    if (params.useJsse()) {
 +      tServerSock = TSSLTransportFactory.getServerSocket(port, timeout, 
params.isClientAuth(), address);
 +    } else {
 +      tServerSock = TSSLTransportFactory.getServerSocket(port, timeout, 
address, params.getTTransportParams());
 +    }
 +
 +    final ServerSocket serverSock = tServerSock.getServerSocket();
 +    if (serverSock instanceof SSLServerSocket) {
 +      SSLServerSocket sslServerSock = (SSLServerSocket) serverSock;
 +      String[] protocols = params.getServerProtocols();
 +
 +      // Be nice for the user and automatically remove protocols that might 
not exist in their JVM. Keeps us from forcing config alterations too
 +      // e.g. TLSv1.1 and TLSv1.2 don't exist in JDK6
 +      Set<String> socketEnabledProtocols = new 
HashSet<String>(Arrays.asList(sslServerSock.getEnabledProtocols()));
 +      // Keep only the enabled protocols that were specified by the 
configuration
 +      socketEnabledProtocols.retainAll(Arrays.asList(protocols));
 +      if (socketEnabledProtocols.isEmpty()) {
 +        // Bad configuration...
 +        throw new RuntimeException("No available protocols available for 
secure socket. Availaable protocols: "
 +            + Arrays.toString(sslServerSock.getEnabledProtocols()) + ", 
allowed protocols: " + Arrays.toString(protocols));
 +      }
 +
 +      // Set the protocol(s) on the server socket
 +      sslServerSock.setEnabledProtocols(socketEnabledProtocols.toArray(new 
String[0]));
 +    }
 +
 +    return tServerSock;
 +  }
 +
 +  /**
 +   * Create a Thrift SSL server.
 +   *
 +   * @param address
 +   *          host and port to bind to
 +   * @param processor
 +   *          TProcessor for the server
 +   * @param socketTimeout
 +   *          Socket timeout
 +   * @param sslParams
 +   *          SSL parameters
 +   * @return A ServerAddress with the bound-socket information and the Thrift 
server
 +   */
 +  public static ServerAddress createSslThreadPoolServer(HostAndPort address, 
TProcessor processor, TProtocolFactory protocolFactory, long socketTimeout,
 +      SslConnectionParams sslParams, String serverName, int numThreads, int 
numSimpleTimerThreads, long timeBetweenThreadChecks) throws TTransportException 
{
 +    TServerSocket transport;
 +    try {
 +      transport = getSslServerSocket(address.getPort(), (int) socketTimeout, 
InetAddress.getByName(address.getHostText()), sslParams);
 +    } catch (UnknownHostException e) {
 +      throw new TTransportException(e);
 +    }
 +
 +    if (address.getPort() == 0) {
 +      address = HostAndPort.fromParts(address.getHostText(), 
transport.getServerSocket().getLocalPort());
 +    }
 +
 +    ThreadPoolExecutor pool = createSelfResizingThreadPool(serverName, 
numThreads, numSimpleTimerThreads, timeBetweenThreadChecks);
 +
 +    return new ServerAddress(createTThreadPoolServer(transport, processor, 
ThriftUtil.transportFactory(), protocolFactory, pool), address);
 +  }
 +
 +  public static ServerAddress createSaslThreadPoolServer(HostAndPort address, 
TProcessor processor, TProtocolFactory protocolFactory, long socketTimeout,
 +      SaslServerConnectionParams params, final String serverName, String 
threadName, final int numThreads, final int numSTThreads, long 
timeBetweenThreadChecks)
 +      throws TTransportException {
 +    // We'd really prefer to use THsHaServer (or similar) to avoid 1 RPC == 1 
Thread that the TThreadPoolServer does,
 +    // but sadly this isn't the case. Because TSaslTransport needs to issue a 
handshake when it open()'s which will fail
 +    // when the server does an accept() to (presumably) wake up the eventing 
system.
 +    log.info("Creating SASL thread pool thrift server on listening on {}:{}", 
address.getHostText(), address.getPort());
 +    TServerSocket transport = new TServerSocket(address.getPort(), (int) 
socketTimeout);
 +
 +    String hostname, fqdn;
 +    try {
 +      hostname = 
InetAddress.getByName(address.getHostText()).getCanonicalHostName();
 +      fqdn = InetAddress.getLocalHost().getCanonicalHostName();
 +    } catch (UnknownHostException e) {
 +      throw new TTransportException(e);
 +    }
 +
 +    // If we can't get a real hostname from the provided host test, use the 
hostname from DNS for localhost
 +    if ("0.0.0.0".equals(hostname)) {
 +      hostname = fqdn;
 +    }
 +
 +    // ACCUMULO-3497 an easy sanity check we can perform for the user when 
SASL is enabled. Clients and servers have to agree upon the FQDN
 +    // so that the SASL handshake can occur. If the provided hostname doesn't 
match the FQDN for this host, fail quickly and inform them to update
 +    // their configuration.
 +    if (!hostname.equals(fqdn)) {
 +      log.error(
 +          "Expected hostname of '{}' but got '{}'. Ensure the entries in the 
Accumulo hosts files (e.g. masters, slaves) are the FQDN for each host when 
using SASL.",
 +          fqdn, hostname);
 +      throw new RuntimeException("SASL requires that the address the thrift 
server listens on is the same as the FQDN for this host");
 +    }
 +
 +    final UserGroupInformation serverUser;
 +    try {
 +      serverUser = UserGroupInformation.getLoginUser();
 +    } catch (IOException e) {
 +      throw new TTransportException(e);
 +    }
 +
 +    log.debug("Logged in as {}, creating TSaslServerTransport factory with 
{}/{}", serverUser, params.getKerberosServerPrimary(), hostname);
 +
 +    // Make the SASL transport factory with the instance and primary from the 
kerberos server principal, SASL properties
 +    // and the SASL callback handler from Hadoop to ensure authorization ID 
is the authentication ID. Despite the 'protocol' argument seeming to be 
useless, it
 +    // *must* be the primary of the server.
 +    TSaslServerTransport.Factory saslTransportFactory = new 
TSaslServerTransport.Factory();
 +    saslTransportFactory.addServerDefinition(ThriftUtil.GSSAPI, 
params.getKerberosServerPrimary(), hostname, params.getSaslProperties(),
 +        new SaslRpcServer.SaslGssCallbackHandler());
 +
 +    if (null != params.getSecretManager()) {
 +      log.info("Adding DIGEST-MD5 server definition for delegation tokens");
 +      saslTransportFactory.addServerDefinition(ThriftUtil.DIGEST_MD5, 
params.getKerberosServerPrimary(), hostname, params.getSaslProperties(),
 +          new SaslServerDigestCallbackHandler(params.getSecretManager()));
 +    } else {
 +      log.info("SecretManager is null, not adding support for delegation 
token authentication");
 +    }
 +
 +    // Make sure the TTransportFactory is performing a UGI.doAs
 +    TTransportFactory ugiTransportFactory = new 
UGIAssumingTransportFactory(saslTransportFactory, serverUser);
 +
 +    if (address.getPort() == 0) {
 +      // If we chose a port dynamically, make a new use it (along with the 
proper hostname)
 +      address = HostAndPort.fromParts(address.getHostText(), 
transport.getServerSocket().getLocalPort());
 +      log.info("SASL thrift server bound on {}", address);
 +    }
 +
 +    ThreadPoolExecutor pool = createSelfResizingThreadPool(serverName, 
numThreads, numSTThreads, timeBetweenThreadChecks);
 +
 +    final TThreadPoolServer server = createTThreadPoolServer(transport, 
processor, ugiTransportFactory, protocolFactory, pool);
 +
 +    return new ServerAddress(server, address);
 +  }
 +
 +  public static ServerAddress startTServer(AccumuloConfiguration conf, 
HostAndPort address, ThriftServerType serverType, TProcessor processor,
 +      String serverName, String threadName, int numThreads, int numSTThreads, 
long timeBetweenThreadChecks, long maxMessageSize, SslConnectionParams 
sslParams,
 +      SaslServerConnectionParams saslParams, long serverSocketTimeout) throws 
TTransportException {
 +
 +    if (ThriftServerType.SASL == serverType) {
 +      processor = updateSaslProcessor(serverType, processor);
 +    }
 +
 +    return startTServer(address, serverType, new TimedProcessor(conf, 
processor, serverName, threadName), serverName, threadName, numThreads, 
numSTThreads,
 +        timeBetweenThreadChecks, maxMessageSize, sslParams, saslParams, 
serverSocketTimeout);
 +  }
 +
 +  /**
 +   * @see #startTServer(HostAndPort, ThriftServerType, TimedProcessor, 
TProtocolFactory, String, String, int, int, long, long, SslConnectionParams,
 +   *      SaslServerConnectionParams, long)
 +   */
 +  public static ServerAddress startTServer(HostAndPort address, 
ThriftServerType serverType, TimedProcessor processor, String serverName, 
String threadName,
 +      int numThreads, int numSTThreads, long timeBetweenThreadChecks, long 
maxMessageSize, SslConnectionParams sslParams,
 +      SaslServerConnectionParams saslParams, long serverSocketTimeout) throws 
TTransportException {
 +    return startTServer(address, serverType, processor, 
ThriftUtil.protocolFactory(), serverName, threadName, numThreads, numSTThreads,
 +        timeBetweenThreadChecks, maxMessageSize, sslParams, saslParams, 
serverSocketTimeout);
 +  }
 +
 +  /**
 +   * Start the appropriate Thrift server (SSL or non-blocking server) for the 
given parameters. Non-null SSL parameters will cause an SSL server to be 
started.
 +   *
 +   * @return A ServerAddress encapsulating the Thrift server created and the 
host/port which it is bound to.
 +   */
 +  public static ServerAddress startTServer(HostAndPort address, 
ThriftServerType serverType, TimedProcessor processor, TProtocolFactory 
protocolFactory,
 +      String serverName, String threadName, int numThreads, int numSTThreads, 
long timeBetweenThreadChecks, long maxMessageSize, SslConnectionParams 
sslParams,
 +      SaslServerConnectionParams saslParams, long serverSocketTimeout) throws 
TTransportException {
 +
 +    // This is presently not supported. It's hypothetically possible, I 
believe, to work, but it would require changes in how the transports
 +    // work at the Thrift layer to ensure that both the SSL and SASL 
handshakes function. SASL's quality of protection addresses privacy issues.
 +    checkArgument(!(sslParams != null && saslParams != null), "Cannot start a 
Thrift server using both SSL and SASL");
 +
 +    ServerAddress serverAddress;
 +    switch (serverType) {
 +      case SSL:
 +        log.debug("Instantiating SSL Thrift server");
 +        serverAddress = createSslThreadPoolServer(address, processor, 
protocolFactory, serverSocketTimeout, sslParams, serverName, numThreads, 
numSTThreads,
 +            timeBetweenThreadChecks);
 +        break;
 +      case SASL:
 +        log.debug("Instantiating SASL Thrift server");
 +        serverAddress = createSaslThreadPoolServer(address, processor, 
protocolFactory, serverSocketTimeout, saslParams, serverName, threadName, 
numThreads,
 +            numSTThreads, timeBetweenThreadChecks);
 +        break;
 +      case THREADPOOL:
 +        log.debug("Instantiating unsecure TThreadPool Thrift server");
 +        serverAddress = createBlockingServer(address, processor, 
protocolFactory, maxMessageSize, serverName, numThreads, numSTThreads, 
timeBetweenThreadChecks);
 +        break;
 +      case CUSTOM_HS_HA: // Intentional passthrough -- Our custom wrapper 
around HsHa is the default
 +      default:
 +        log.debug("Instantiating default, unsecure custom half-async Thrift 
server");
 +        serverAddress = createNonBlockingServer(address, processor, 
protocolFactory, serverName, threadName, numThreads, numSTThreads, 
timeBetweenThreadChecks,
 +            maxMessageSize);
 +    }
 +
 +    final TServer finalServer = serverAddress.server;
 +    Runnable serveTask = new Runnable() {
 +      @Override
 +      public void run() {
 +        try {
 +          finalServer.serve();
 +        } catch (Error e) {
-           Halt.halt("Unexpected error in TThreadPoolServer " + e + ", 
halting.");
++          Halt.halt("Unexpected error in TThreadPoolServer " + e + ", 
halting.", 1);
 +        }
 +      }
 +    };
 +
 +    serveTask = new LoggingRunnable(TServerUtils.log, serveTask);
 +    Thread thread = new Daemon(serveTask, threadName);
 +    thread.start();
 +
 +    // check for the special "bind to everything address"
 +    if (serverAddress.address.getHostText().equals("0.0.0.0")) {
 +      // can't get the address from the bind, so we'll do our best to invent 
our hostname
 +      try {
 +        serverAddress = new ServerAddress(finalServer, 
HostAndPort.fromParts(InetAddress.getLocalHost().getHostName(), 
serverAddress.address.getPort()));
 +      } catch (UnknownHostException e) {
 +        throw new TTransportException(e);
 +      }
 +    }
 +    return serverAddress;
 +  }
 +
 +  /**
 +   * Stop a Thrift TServer. Existing connections will keep our thread 
running; use reflection to forcibly shut down the threadpool.
 +   *
 +   * @param s
 +   *          The TServer to stop
 +   */
 +  public static void stopTServer(TServer s) {
 +    if (s == null)
 +      return;
 +    s.stop();
 +    try {
 +      Field f = s.getClass().getDeclaredField("executorService_");
 +      f.setAccessible(true);
 +      ExecutorService es = (ExecutorService) f.get(s);
 +      es.shutdownNow();
 +    } catch (Exception e) {
 +      log.error("Unable to call shutdownNow", e);
 +    }
 +  }
 +
 +  /**
 +   * Wrap the provided processor in the {@link UGIAssumingProcessor} so 
Kerberos authentication works. Requires the <code>serverType</code> to be
 +   * {@link ThriftServerType#SASL} and throws an exception when it is not.
 +   *
 +   * @return A {@link UGIAssumingProcessor} which wraps the provided processor
 +   */
 +  private static TProcessor updateSaslProcessor(ThriftServerType serverType, 
TProcessor processor) {
 +    checkArgument(ThriftServerType.SASL == serverType);
 +
 +    // Wrap the provided processor in our special processor which proxies the 
provided UGI on the logged-in UGI
 +    // Important that we have Timed -> UGIAssuming -> [provided] to make sure 
that the metrics are still reported
 +    // as the logged-in user.
 +    log.info("Wrapping {} in UGIAssumingProcessor", processor.getClass());
 +
 +    return new UGIAssumingProcessor(processor);
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/462dfbc3/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/462dfbc3/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --cc 
server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index a3b224f,8155ea6..a7abe05
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@@ -2370,10 -3224,10 +2370,10 @@@ public class TabletServer extends Accum
  
          @Override
          public void unableToMonitorLockNode(final Throwable e) {
-           Halt.halt(0, new Runnable() {
+           Halt.halt(1, new Runnable() {
              @Override
              public void run() {
 -              log.fatal("Lost ability to monitor tablet server lock, 
exiting.", e);
 +              log.error("Lost ability to monitor tablet server lock, 
exiting.", e);
              }
            });
  

http://git-wip-us.apache.org/repos/asf/accumulo/blob/462dfbc3/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
----------------------------------------------------------------------
diff --cc 
server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
index c7b6c98,158fdbd..bb8ae6f
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
@@@ -213,30 -192,16 +213,30 @@@ public class TabletServerLogger 
        this.createTime = System.currentTimeMillis();
        return;
      } catch (Exception t) {
 -      throw new RuntimeException(t);
 -    }
 -  }
 +      if (null == retry) {
 +        retry = retryFactory.create();
 +      }
  
 -  public void resetLoggers() throws IOException {
 -    logSetLock.writeLock().lock();
 -    try {
 -      close();
 -    } finally {
 -      logSetLock.writeLock().unlock();
 +      // We have more retries or we exceeded the maximum number of accepted 
failures
 +      if (retry.canRetry()) {
 +        // Use the retry and record the time in which we did so
 +        retry.useRetry();
 +
 +        try {
 +          // Backoff
 +          retry.waitForNextAttempt();
 +        } catch (InterruptedException e) {
 +          Thread.currentThread().interrupt();
 +          throw new RuntimeException(e);
 +        }
 +      } else {
 +        log.error("Repeatedly failed to create WAL. Going to exit 
tabletserver.", t);
 +        // We didn't have retries or we failed too many times.
-         Halt.halt("Experienced too many errors creating WALs, giving up");
++        Halt.halt("Experienced too many errors creating WALs, giving up", 1);
 +      }
 +
 +      // The exception will trigger the log creation to be re-attempted.
 +      throw new RuntimeException(t);
      }
    }
  

Reply via email to