Repository: accumulo Updated Branches: refs/heads/master 80805545e -> a0dbe56c7
ACCUMULO-3492 Consolidate duplicate and unnecessary code in TServerUtils Uses the self-resizing thread pool for all tservers and removes unused method arguments Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/a0dbe56c Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/a0dbe56c Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/a0dbe56c Branch: refs/heads/master Commit: a0dbe56c75a339e396ea80da7d2092945272ad78 Parents: 8080554 Author: Josh Elser <els...@apache.org> Authored: Thu Jan 22 13:23:51 2015 -0500 Committer: Josh Elser <els...@apache.org> Committed: Thu Jan 22 13:23:51 2015 -0500 ---------------------------------------------------------------------- .../accumulo/server/rpc/TServerUtils.java | 101 ++++++++++++------- 1 file changed, 67 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/a0dbe56c/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java index 6a1adda..cd92e5c 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java +++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java @@ -185,9 +185,37 @@ public class TServerUtils { options.stopTimeoutVal(5); // Create our own very special thread pool. - final ThreadPoolExecutor pool = new SimpleThreadPool(numThreads, "ClientPool"); + ThreadPoolExecutor pool = createSelfResizingThreadPool(serverName, numThreads, numSTThreads, timeBetweenThreadChecks); + + options.executorService(pool); + options.processorFactory(new TProcessorFactory(processor)); + + if (address.getPort() == 0) { + address = HostAndPort.fromParts(address.getHostText(), transport.getPort()); + } + + return new ServerAddress(new CustomNonBlockingServer(options), address); + } + + /** + * Creates a {@link SimpleThreadPool} which uses {@link SimpleTimer} to inspect the core pool size and number of active threads of the + * {@link ThreadPoolExecutor} and increase or decrease the core pool size based on activity (excessive or lack thereof). + * + * @param serverName + * A name to describe the thrift server this executor will service + * @param executorThreads + * The maximum number of threads for the executor + * @param simpleTimerThreads + * The numbers of threads used to get the {@link SimpleTimer} instance + * @param timeBetweenThreadChecks + * The amount of time, in millis, between attempts to resize the executor thread pool + * @return A {@link ThreadPoolExecutor} which will resize itself automatically + */ + public static ThreadPoolExecutor createSelfResizingThreadPool(final String serverName, final int executorThreads, int simpleTimerThreads, + long timeBetweenThreadChecks) { + final ThreadPoolExecutor pool = new SimpleThreadPool(executorThreads, "ClientPool"); // periodically adjust the number of threads we need by checking how busy our threads are - SimpleTimer.getInstance(numSTThreads).schedule(new Runnable() { + SimpleTimer.getInstance(simpleTimerThreads).schedule(new Runnable() { @Override public void run() { // there is a minor race condition between sampling the current state of the thread pool and adjusting it @@ -199,7 +227,7 @@ public class TServerUtils { pool.setCorePoolSize(larger); } else { if (pool.getCorePoolSize() > pool.getActiveCount() + 3) { - int smaller = Math.max(numThreads, pool.getCorePoolSize() - 1); + int smaller = Math.max(executorThreads, pool.getCorePoolSize() - 1); if (smaller != pool.getCorePoolSize()) { log.info("Decreasing server thread pool size on {} to {}", serverName, smaller); pool.setCorePoolSize(smaller); @@ -208,15 +236,7 @@ public class TServerUtils { } } }, timeBetweenThreadChecks, timeBetweenThreadChecks); - - options.executorService(pool); - options.processorFactory(new TProcessorFactory(processor)); - - if (address.getPort() == 0) { - address = HostAndPort.fromParts(address.getHostText(), transport.getPort()); - } - - return new ServerAddress(new CustomNonBlockingServer(options), address); + return pool; } /** @@ -230,10 +250,12 @@ public class TServerUtils { * Maximum size of a Thrift message allowed * @return A configured TThreadPoolServer and its bound address information */ - public static ServerAddress createBlockingServer(HostAndPort address, TProcessor processor, long maxMessageSize) throws TTransportException { + public static ServerAddress createBlockingServer(HostAndPort address, TProcessor processor, long maxMessageSize, String serverName, int numThreads, + int numSimpleTimerThreads, long timeBetweenThreadChecks) throws TTransportException { TServerSocket transport = new TServerSocket(address.getPort()); - TThreadPoolServer server = createThreadPoolServer(transport, processor, ThriftUtil.transportFactory(maxMessageSize)); + ThreadPoolExecutor pool = createSelfResizingThreadPool(serverName, numThreads, numSimpleTimerThreads, timeBetweenThreadChecks); + TThreadPoolServer server = createTThreadPoolServer(transport, processor, ThriftUtil.transportFactory(maxMessageSize), pool); if (address.getPort() == 0) { address = HostAndPort.fromParts(address.getHostText(), transport.getServerSocket().getLocalPort()); @@ -244,20 +266,22 @@ public class TServerUtils { } /** - * Create a TThreadPoolServer with the given transport and processo with the default transport factory.r + * Create a {@link TThreadPoolServer} with the provided transport, processor and transport factory. * * @param transport - * TServerTransport for the server + * Server transport * @param processor - * TProcessor for the server - * @return A configured TThreadPoolServer + * Processor implementation + * @param transportFactory + * Transport factory + * @return A configured {@link TThreadPoolServer} */ - public static TThreadPoolServer createThreadPoolServer(TServerTransport transport, TProcessor processor) { - return createThreadPoolServer(transport, processor, ThriftUtil.transportFactory()); + public static TThreadPoolServer createTThreadPoolServer(TServerTransport transport, TProcessor processor, TTransportFactory transportFactory) { + return createTThreadPoolServer(transport, processor, transportFactory, null); } /** - * Create a TServer with the provided server transport, processor and transport factory. + * Create a {@link TThreadPoolServer} with the provided server transport, processor and transport factory. * * @param transport * TServerTransport for the server @@ -266,11 +290,15 @@ public class TServerUtils { * @param transportFactory * TTransportFactory for the server */ - public static TThreadPoolServer createThreadPoolServer(TServerTransport transport, TProcessor processor, TTransportFactory transportFactory) { + public static TThreadPoolServer createTThreadPoolServer(TServerTransport transport, TProcessor processor, TTransportFactory transportFactory, + ExecutorService service) { TThreadPoolServer.Args options = new TThreadPoolServer.Args(transport); options.protocolFactory(ThriftUtil.protocolFactory()); options.transportFactory(transportFactory); options.processorFactory(new ClientInfoProcessorFactory(clientAddress, processor)); + if (null != service) { + options.executorService(service); + } return new TThreadPoolServer(options); } @@ -331,7 +359,8 @@ public class TServerUtils { * SSL parameters * @return A ServerAddress with the bound-socket information and the Thrift server */ - public static ServerAddress createSslThreadPoolServer(HostAndPort address, TProcessor processor, long socketTimeout, SslConnectionParams sslParams) + public static ServerAddress createSslThreadPoolServer(HostAndPort address, TProcessor processor, long socketTimeout, SslConnectionParams sslParams, + String serverName, int numThreads, int numSimpleTimerThreads, long timeBetweenThreadChecks) throws TTransportException { TServerSocket transport; try { @@ -339,15 +368,18 @@ public class TServerUtils { } catch (UnknownHostException e) { throw new TTransportException(e); } + if (address.getPort() == 0) { address = HostAndPort.fromParts(address.getHostText(), transport.getServerSocket().getLocalPort()); } - return new ServerAddress(createThreadPoolServer(transport, processor), address); + + ThreadPoolExecutor pool = createSelfResizingThreadPool(serverName, numThreads, numSimpleTimerThreads, timeBetweenThreadChecks); + + return new ServerAddress(createTThreadPoolServer(transport, processor, ThriftUtil.transportFactory(), pool), address); } public static ServerAddress createSaslThreadPoolServer(HostAndPort address, TProcessor processor, long socketTimeout, SaslConnectionParams params, - final String serverName, String threadName, final int numThreads, final int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize) - throws TTransportException { + final String serverName, String threadName, final int numThreads, final int numSTThreads, long timeBetweenThreadChecks) throws TTransportException { // We'd really prefer to use THsHaServer (or similar) to avoid 1 RPC == 1 Thread that the TThreadPoolServer does, // but sadly this isn't the case. Because TSaslTransport needs to issue a handshake when it open()'s which will fail // when the server does an accept() to (presumably) wake up the eventing system. @@ -377,9 +409,6 @@ public class TServerUtils { saslTransportFactory.addServerDefinition(ThriftUtil.GSSAPI, params.getKerberosServerPrimary(), hostname, params.getSaslProperties(), new SaslRpcServer.SaslGssCallbackHandler()); - // Updates the clientAddress threadlocal so we know who the client's address - final ClientInfoProcessorFactory clientInfoFactory = new ClientInfoProcessorFactory(clientAddress, processor); - // Make sure the TTransportFactory is performing a UGI.doAs TTransportFactory ugiTransportFactory = new UGIAssumingTransportFactory(saslTransportFactory, serverUser); @@ -387,8 +416,11 @@ public class TServerUtils { address = HostAndPort.fromParts(address.getHostText(), transport.getServerSocket().getLocalPort()); } - return new ServerAddress(new TThreadPoolServer(new TThreadPoolServer.Args(transport).transportFactory(ugiTransportFactory) - .processorFactory(clientInfoFactory).protocolFactory(ThriftUtil.protocolFactory())), address); + ThreadPoolExecutor pool = createSelfResizingThreadPool(serverName, numThreads, numSTThreads, timeBetweenThreadChecks); + + final TThreadPoolServer server = createTThreadPoolServer(transport, processor, ugiTransportFactory, pool); + + return new ServerAddress(server, address); } public static ServerAddress startTServer(AccumuloConfiguration conf, HostAndPort address, ThriftServerType serverType, TProcessor processor, @@ -420,16 +452,17 @@ public class TServerUtils { switch (serverType) { case SSL: log.debug("Instantiating SSL Thrift server"); - serverAddress = createSslThreadPoolServer(address, processor, serverSocketTimeout, sslParams); + serverAddress = createSslThreadPoolServer(address, processor, serverSocketTimeout, sslParams, serverName, numThreads, numSTThreads, + timeBetweenThreadChecks); break; case SASL: log.debug("Instantiating SASL Thrift server"); serverAddress = createSaslThreadPoolServer(address, processor, serverSocketTimeout, saslParams, serverName, threadName, numThreads, numSTThreads, - timeBetweenThreadChecks, maxMessageSize); + timeBetweenThreadChecks); break; case THREADPOOL: log.debug("Instantiating unsecure TThreadPool Thrift server"); - serverAddress = createBlockingServer(address, processor, maxMessageSize); + serverAddress = createBlockingServer(address, processor, maxMessageSize, serverName, numThreads, numSTThreads, timeBetweenThreadChecks); break; case CUSTOM_HS_HA: // Intentional passthrough -- Our custom wrapper around HsHa is the default default: