This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push: new af5972e464 Added property to configure TCP backlog for Thrift server sockets (#3752) af5972e464 is described below commit af5972e46498ae39e7091229b948cda6de92c2a6 Author: Dave Marion <dlmar...@apache.org> AuthorDate: Thu Sep 14 12:37:15 2023 -0400 Added property to configure TCP backlog for Thrift server sockets (#3752) Closes #3751 --- .../org/apache/accumulo/core/conf/Property.java | 5 ++ .../apache/accumulo/server/rpc/TServerUtils.java | 63 +++++++++++++--------- .../apache/accumulo/gc/SimpleGarbageCollector.java | 3 +- .../accumulo/test/functional/ZombieTServer.java | 10 ++-- .../accumulo/test/performance/NullTserver.java | 3 +- 5 files changed, 54 insertions(+), 30 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index 0bff139725..b2338b4d27 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -51,6 +51,11 @@ public enum Property { "Properties in this category related to the configuration of SSL keys for" + " RPC. See also instance.ssl.enabled", "1.6.0"), + RPC_BACKLOG("rpc.backlog", "50", PropertyType.COUNT, + "Configures the TCP backlog for the server side sockets created by Thrift." + + " This property is not used for SSL type server sockets. A value of zero" + + " will use the Thrift default value.", + "2.1.3"), RPC_SSL_KEYSTORE_PATH("rpc.javax.net.ssl.keyStore", "", PropertyType.PATH, "Path of the keystore file for the server's private SSL key", "1.6.0"), @Sensitive diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java index 396fcc9f93..ff139ac003 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java +++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java @@ -60,9 +60,11 @@ import org.apache.thrift.server.TServer; import org.apache.thrift.server.TThreadPoolServer; import org.apache.thrift.server.TThreadedSelectorServer; import org.apache.thrift.transport.TNonblockingServerSocket; +import org.apache.thrift.transport.TNonblockingServerSocket.NonblockingAbstractServerSocketArgs; import org.apache.thrift.transport.TSSLTransportFactory; import org.apache.thrift.transport.TSaslServerTransport; import org.apache.thrift.transport.TServerSocket; +import org.apache.thrift.transport.TServerSocket.ServerSocketTransportArgs; import org.apache.thrift.transport.TServerTransport; import org.apache.thrift.transport.TTransportException; import org.apache.thrift.transport.TTransportFactory; @@ -158,6 +160,8 @@ public class TServerUtils { portSearch = config.getBoolean(portSearchProperty); } + int backlog = config.getCount(Property.RPC_BACKLOG); + final ThriftServerType serverType = context.getThriftServerType(); if (serverType == ThriftServerType.SASL) { @@ -174,7 +178,7 @@ public class TServerUtils { return TServerUtils.startTServer(serverType, timedProcessor, serverName, threadName, minThreads, threadTimeOut, config, timeBetweenThreadChecks, maxMessageSize, context.getServerSslParams(), context.getSaslParams(), context.getClientTimeoutInMillis(), - addresses); + backlog, addresses); } catch (TTransportException e) { if (portSearch) { // Build a list of reserved ports - as identified by properties of type PropertyType.PORT @@ -199,7 +203,7 @@ public class TServerUtils { return TServerUtils.startTServer(serverType, timedProcessor, serverName, threadName, minThreads, threadTimeOut, config, timeBetweenThreadChecks, maxMessageSize, context.getServerSslParams(), context.getSaslParams(), - context.getClientTimeoutInMillis(), addr); + context.getClientTimeoutInMillis(), backlog, addr); } catch (TTransportException tte) { log.info("Unable to use port {}, retrying. (Thread Name = {})", port, threadName); } @@ -220,11 +224,13 @@ public class TServerUtils { private static ServerAddress createThreadedSelectorServer(HostAndPort address, TProcessor processor, TProtocolFactory protocolFactory, final String serverName, final int numThreads, final long threadTimeOut, final AccumuloConfiguration conf, - long timeBetweenThreadChecks, long maxMessageSize) throws TTransportException { + long timeBetweenThreadChecks, long maxMessageSize, int backlog) throws TTransportException { + + NonblockingAbstractServerSocketArgs args = new NonblockingAbstractServerSocketArgs() + .backlog(backlog).bindAddr(new InetSocketAddress(address.getHost(), address.getPort())) + .clientTimeout(0).maxFrameSize(Ints.saturatedCast(maxMessageSize)); - final TNonblockingServerSocket transport = - new TNonblockingServerSocket(new InetSocketAddress(address.getHost(), address.getPort()), 0, - Ints.saturatedCast(maxMessageSize)); + final TNonblockingServerSocket transport = new TNonblockingServerSocket(args); TThreadedSelectorServer.Args options = new TThreadedSelectorServer.Args(transport); @@ -256,11 +262,13 @@ public class TServerUtils { private static ServerAddress createNonBlockingServer(HostAndPort address, TProcessor processor, TProtocolFactory protocolFactory, final String serverName, final int numThreads, final long threadTimeOut, final AccumuloConfiguration conf, long timeBetweenThreadChecks, - long maxMessageSize) throws TTransportException { + long maxMessageSize, int backlog) throws TTransportException { + + NonblockingAbstractServerSocketArgs args = new NonblockingAbstractServerSocketArgs() + .backlog(backlog).bindAddr(new InetSocketAddress(address.getHost(), address.getPort())) + .clientTimeout(0).maxFrameSize(Ints.saturatedCast(maxMessageSize)); - final TNonblockingServerSocket transport = - new TNonblockingServerSocket(new InetSocketAddress(address.getHost(), address.getPort()), 0, - Ints.saturatedCast(maxMessageSize)); + final TNonblockingServerSocket transport = new TNonblockingServerSocket(args); final CustomNonBlockingServer.Args options = new CustomNonBlockingServer.Args(transport); options.protocolFactory(protocolFactory); @@ -329,12 +337,15 @@ public class TServerUtils { */ private static ServerAddress createBlockingServer(HostAndPort address, TProcessor processor, TProtocolFactory protocolFactory, long maxMessageSize, String serverName, int numThreads, - long threadTimeOut, final AccumuloConfiguration conf, long timeBetweenThreadChecks) - throws TTransportException { + long threadTimeOut, final AccumuloConfiguration conf, long timeBetweenThreadChecks, + int backlog) throws TTransportException { InetSocketAddress isa = new InetSocketAddress(address.getHost(), address.getPort()); // Must use an ISA, providing only a port would ignore the hostname given - TServerSocket transport = new TServerSocket(isa); + + ServerSocketTransportArgs args = new ServerSocketTransportArgs().backlog(backlog).bindAddr(isa); + + TServerSocket transport = new TServerSocket(args); ThreadPoolExecutor pool = createSelfResizingThreadPool(serverName, numThreads, threadTimeOut, conf, timeBetweenThreadChecks); TThreadPoolServer server = createTThreadPoolServer(transport, processor, @@ -455,7 +466,8 @@ public class TServerUtils { private static ServerAddress createSaslThreadPoolServer(HostAndPort address, TProcessor processor, TProtocolFactory protocolFactory, long socketTimeout, SaslServerConnectionParams params, final String serverName, final int numThreads, final long threadTimeOut, - final AccumuloConfiguration conf, long timeBetweenThreadChecks) throws TTransportException { + final AccumuloConfiguration conf, long timeBetweenThreadChecks, int backlog) + throws TTransportException { // We'd really prefer to use THsHaServer (or similar) to avoid 1 RPC == 1 Thread that the // TThreadPoolServer does, // but sadly this isn't the case. Because TSaslTransport needs to issue a handshake when it @@ -465,7 +477,10 @@ public class TServerUtils { address.getPort()); InetSocketAddress isa = new InetSocketAddress(address.getHost(), address.getPort()); // Must use an ISA, providing only a port would ignore the hostname given - TServerSocket transport = new TServerSocket(isa, (int) socketTimeout); + ServerSocketTransportArgs args = new ServerSocketTransportArgs().backlog(backlog).bindAddr(isa) + .clientTimeout((int) socketTimeout); + + TServerSocket transport = new TServerSocket(args); String hostname, fqdn; try { @@ -550,7 +565,7 @@ public class TServerUtils { ThriftServerType serverType, TProcessor processor, String serverName, String threadName, int numThreads, long threadTimeOut, long timeBetweenThreadChecks, long maxMessageSize, SslConnectionParams sslParams, SaslServerConnectionParams saslParams, - long serverSocketTimeout, HostAndPort... addresses) { + long serverSocketTimeout, int backlog, HostAndPort... addresses) { if (serverType == ThriftServerType.SASL) { processor = updateSaslProcessor(serverType, processor); @@ -559,7 +574,7 @@ public class TServerUtils { try { return startTServer(serverType, new TimedProcessor(processor), serverName, threadName, numThreads, threadTimeOut, conf, timeBetweenThreadChecks, maxMessageSize, sslParams, - saslParams, serverSocketTimeout, addresses); + saslParams, serverSocketTimeout, backlog, addresses); } catch (TTransportException e) { throw new IllegalStateException(e); } @@ -576,7 +591,7 @@ public class TServerUtils { String serverName, String threadName, int numThreads, long threadTimeOut, final AccumuloConfiguration conf, long timeBetweenThreadChecks, long maxMessageSize, SslConnectionParams sslParams, SaslServerConnectionParams saslParams, - long serverSocketTimeout, HostAndPort... addresses) throws TTransportException { + long serverSocketTimeout, int backlog, HostAndPort... addresses) throws TTransportException { TProtocolFactory protocolFactory = ThriftUtil.protocolFactory(); // This is presently not supported. It's hypothetically possible, I believe, to work, but it // would require changes in how the transports @@ -599,24 +614,24 @@ public class TServerUtils { log.debug("Instantiating SASL Thrift server"); serverAddress = createSaslThreadPoolServer(address, processor, protocolFactory, serverSocketTimeout, saslParams, serverName, numThreads, threadTimeOut, conf, - timeBetweenThreadChecks); + timeBetweenThreadChecks, backlog); break; case THREADPOOL: log.debug("Instantiating unsecure TThreadPool Thrift server"); serverAddress = createBlockingServer(address, processor, protocolFactory, maxMessageSize, - serverName, numThreads, threadTimeOut, conf, timeBetweenThreadChecks); + serverName, numThreads, threadTimeOut, conf, timeBetweenThreadChecks, backlog); break; case THREADED_SELECTOR: log.debug("Instantiating default, unsecure Threaded selector Thrift server"); - serverAddress = - createThreadedSelectorServer(address, processor, protocolFactory, serverName, - numThreads, threadTimeOut, conf, timeBetweenThreadChecks, maxMessageSize); + serverAddress = createThreadedSelectorServer(address, processor, protocolFactory, + serverName, numThreads, threadTimeOut, conf, timeBetweenThreadChecks, + maxMessageSize, backlog); break; case CUSTOM_HS_HA: log.debug("Instantiating unsecure custom half-async Thrift server"); serverAddress = createNonBlockingServer(address, processor, protocolFactory, serverName, - numThreads, threadTimeOut, conf, timeBetweenThreadChecks, maxMessageSize); + numThreads, threadTimeOut, conf, timeBetweenThreadChecks, maxMessageSize, backlog); break; default: throw new IllegalArgumentException("Unknown server type " + serverType); diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java index edb660a70c..8b8a4ff9c6 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java @@ -404,7 +404,8 @@ public class SimpleGarbageCollector extends AbstractServer implements Iface { ServerAddress server = TServerUtils.startTServer(getConfiguration(), getContext().getThriftServerType(), processor, this.getClass().getSimpleName(), "GC Monitor Service", 2, ThreadPools.DEFAULT_TIMEOUT_MILLISECS, 1000, maxMessageSize, - getContext().getServerSslParams(), getContext().getSaslParams(), 0, addresses); + getContext().getServerSslParams(), getContext().getSaslParams(), 0, + getConfiguration().getCount(Property.RPC_BACKLOG), addresses); log.debug("Starting garbage collector listening on " + server.address); return server.address; } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java b/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java index 282fbc3f56..61c3c24e7c 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java @@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.clientImpl.thrift.ClientService; +import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.conf.SiteConfiguration; import org.apache.accumulo.core.fate.zookeeper.ServiceLock; import org.apache.accumulo.core.fate.zookeeper.ServiceLock.LockLossReason; @@ -120,10 +121,11 @@ public class ZombieTServer { TabletScanClientService.Processor.class, TabletScanClientService.Iface.class, tch, context)); - ServerAddress serverPort = - TServerUtils.startTServer(context.getConfiguration(), ThriftServerType.CUSTOM_HS_HA, - muxProcessor, "ZombieTServer", "walking dead", 2, ThreadPools.DEFAULT_TIMEOUT_MILLISECS, - 1000, 10 * 1024 * 1024, null, null, -1, HostAndPort.fromParts("0.0.0.0", port)); + ServerAddress serverPort = TServerUtils.startTServer(context.getConfiguration(), + ThriftServerType.CUSTOM_HS_HA, muxProcessor, "ZombieTServer", "walking dead", 2, + ThreadPools.DEFAULT_TIMEOUT_MILLISECS, 1000, 10 * 1024 * 1024, null, null, -1, + context.getConfiguration().getCount(Property.RPC_BACKLOG), + HostAndPort.fromParts("0.0.0.0", port)); String addressString = serverPort.address.toString(); var zLockPath = diff --git a/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java b/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java index d11da541d5..c76736bb6d 100644 --- a/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java +++ b/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java @@ -340,7 +340,8 @@ public class NullTserver { TServerUtils.startTServer(context.getConfiguration(), ThriftServerType.CUSTOM_HS_HA, muxProcessor, "NullTServer", "null tserver", 2, ThreadPools.DEFAULT_TIMEOUT_MILLISECS, 1000, - 10 * 1024 * 1024, null, null, -1, HostAndPort.fromParts("0.0.0.0", opts.port)); + 10 * 1024 * 1024, null, null, -1, context.getConfiguration().getCount(Property.RPC_BACKLOG), + HostAndPort.fromParts("0.0.0.0", opts.port)); HostAndPort addr = HostAndPort.fromParts(InetAddress.getLocalHost().getHostName(), opts.port);