This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new af5972e464 Added property to configure TCP backlog for Thrift server 
sockets (#3752)
af5972e464 is described below

commit af5972e46498ae39e7091229b948cda6de92c2a6
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Thu Sep 14 12:37:15 2023 -0400

    Added property to configure TCP backlog for Thrift server sockets (#3752)
    
    Closes #3751
---
 .../org/apache/accumulo/core/conf/Property.java    |  5 ++
 .../apache/accumulo/server/rpc/TServerUtils.java   | 63 +++++++++++++---------
 .../apache/accumulo/gc/SimpleGarbageCollector.java |  3 +-
 .../accumulo/test/functional/ZombieTServer.java    | 10 ++--
 .../accumulo/test/performance/NullTserver.java     |  3 +-
 5 files changed, 54 insertions(+), 30 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java 
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 0bff139725..b2338b4d27 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -51,6 +51,11 @@ public enum Property {
       "Properties in this category related to the configuration of SSL keys 
for"
           + " RPC. See also instance.ssl.enabled",
       "1.6.0"),
+  RPC_BACKLOG("rpc.backlog", "50", PropertyType.COUNT,
+      "Configures the TCP backlog for the server side sockets created by 
Thrift."
+          + " This property is not used for SSL type server sockets. A value 
of zero"
+          + " will use the Thrift default value.",
+      "2.1.3"),
   RPC_SSL_KEYSTORE_PATH("rpc.javax.net.ssl.keyStore", "", PropertyType.PATH,
       "Path of the keystore file for the server's private SSL key", "1.6.0"),
   @Sensitive
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java 
b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
index 396fcc9f93..ff139ac003 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
@@ -60,9 +60,11 @@ import org.apache.thrift.server.TServer;
 import org.apache.thrift.server.TThreadPoolServer;
 import org.apache.thrift.server.TThreadedSelectorServer;
 import org.apache.thrift.transport.TNonblockingServerSocket;
+import 
org.apache.thrift.transport.TNonblockingServerSocket.NonblockingAbstractServerSocketArgs;
 import org.apache.thrift.transport.TSSLTransportFactory;
 import org.apache.thrift.transport.TSaslServerTransport;
 import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TServerSocket.ServerSocketTransportArgs;
 import org.apache.thrift.transport.TServerTransport;
 import org.apache.thrift.transport.TTransportException;
 import org.apache.thrift.transport.TTransportFactory;
@@ -158,6 +160,8 @@ public class TServerUtils {
       portSearch = config.getBoolean(portSearchProperty);
     }
 
+    int backlog = config.getCount(Property.RPC_BACKLOG);
+
     final ThriftServerType serverType = context.getThriftServerType();
 
     if (serverType == ThriftServerType.SASL) {
@@ -174,7 +178,7 @@ public class TServerUtils {
       return TServerUtils.startTServer(serverType, timedProcessor, serverName, 
threadName,
           minThreads, threadTimeOut, config, timeBetweenThreadChecks, 
maxMessageSize,
           context.getServerSslParams(), context.getSaslParams(), 
context.getClientTimeoutInMillis(),
-          addresses);
+          backlog, addresses);
     } catch (TTransportException e) {
       if (portSearch) {
         // Build a list of reserved ports - as identified by properties of 
type PropertyType.PORT
@@ -199,7 +203,7 @@ public class TServerUtils {
             return TServerUtils.startTServer(serverType, timedProcessor, 
serverName, threadName,
                 minThreads, threadTimeOut, config, timeBetweenThreadChecks, 
maxMessageSize,
                 context.getServerSslParams(), context.getSaslParams(),
-                context.getClientTimeoutInMillis(), addr);
+                context.getClientTimeoutInMillis(), backlog, addr);
           } catch (TTransportException tte) {
             log.info("Unable to use port {}, retrying. (Thread Name = {})", 
port, threadName);
           }
@@ -220,11 +224,13 @@ public class TServerUtils {
   private static ServerAddress createThreadedSelectorServer(HostAndPort 
address,
       TProcessor processor, TProtocolFactory protocolFactory, final String 
serverName,
       final int numThreads, final long threadTimeOut, final 
AccumuloConfiguration conf,
-      long timeBetweenThreadChecks, long maxMessageSize) throws 
TTransportException {
+      long timeBetweenThreadChecks, long maxMessageSize, int backlog) throws 
TTransportException {
+
+    NonblockingAbstractServerSocketArgs args = new 
NonblockingAbstractServerSocketArgs()
+        .backlog(backlog).bindAddr(new InetSocketAddress(address.getHost(), 
address.getPort()))
+        .clientTimeout(0).maxFrameSize(Ints.saturatedCast(maxMessageSize));
 
-    final TNonblockingServerSocket transport =
-        new TNonblockingServerSocket(new InetSocketAddress(address.getHost(), 
address.getPort()), 0,
-            Ints.saturatedCast(maxMessageSize));
+    final TNonblockingServerSocket transport = new 
TNonblockingServerSocket(args);
 
     TThreadedSelectorServer.Args options = new 
TThreadedSelectorServer.Args(transport);
 
@@ -256,11 +262,13 @@ public class TServerUtils {
   private static ServerAddress createNonBlockingServer(HostAndPort address, 
TProcessor processor,
       TProtocolFactory protocolFactory, final String serverName, final int 
numThreads,
       final long threadTimeOut, final AccumuloConfiguration conf, long 
timeBetweenThreadChecks,
-      long maxMessageSize) throws TTransportException {
+      long maxMessageSize, int backlog) throws TTransportException {
+
+    NonblockingAbstractServerSocketArgs args = new 
NonblockingAbstractServerSocketArgs()
+        .backlog(backlog).bindAddr(new InetSocketAddress(address.getHost(), 
address.getPort()))
+        .clientTimeout(0).maxFrameSize(Ints.saturatedCast(maxMessageSize));
 
-    final TNonblockingServerSocket transport =
-        new TNonblockingServerSocket(new InetSocketAddress(address.getHost(), 
address.getPort()), 0,
-            Ints.saturatedCast(maxMessageSize));
+    final TNonblockingServerSocket transport = new 
TNonblockingServerSocket(args);
     final CustomNonBlockingServer.Args options = new 
CustomNonBlockingServer.Args(transport);
 
     options.protocolFactory(protocolFactory);
@@ -329,12 +337,15 @@ public class TServerUtils {
    */
   private static ServerAddress createBlockingServer(HostAndPort address, 
TProcessor processor,
       TProtocolFactory protocolFactory, long maxMessageSize, String 
serverName, int numThreads,
-      long threadTimeOut, final AccumuloConfiguration conf, long 
timeBetweenThreadChecks)
-      throws TTransportException {
+      long threadTimeOut, final AccumuloConfiguration conf, long 
timeBetweenThreadChecks,
+      int backlog) throws TTransportException {
 
     InetSocketAddress isa = new InetSocketAddress(address.getHost(), 
address.getPort());
     // Must use an ISA, providing only a port would ignore the hostname given
-    TServerSocket transport = new TServerSocket(isa);
+
+    ServerSocketTransportArgs args = new 
ServerSocketTransportArgs().backlog(backlog).bindAddr(isa);
+
+    TServerSocket transport = new TServerSocket(args);
     ThreadPoolExecutor pool = createSelfResizingThreadPool(serverName, 
numThreads, threadTimeOut,
         conf, timeBetweenThreadChecks);
     TThreadPoolServer server = createTThreadPoolServer(transport, processor,
@@ -455,7 +466,8 @@ public class TServerUtils {
   private static ServerAddress createSaslThreadPoolServer(HostAndPort address, 
TProcessor processor,
       TProtocolFactory protocolFactory, long socketTimeout, 
SaslServerConnectionParams params,
       final String serverName, final int numThreads, final long threadTimeOut,
-      final AccumuloConfiguration conf, long timeBetweenThreadChecks) throws 
TTransportException {
+      final AccumuloConfiguration conf, long timeBetweenThreadChecks, int 
backlog)
+      throws TTransportException {
     // We'd really prefer to use THsHaServer (or similar) to avoid 1 RPC == 1 
Thread that the
     // TThreadPoolServer does,
     // but sadly this isn't the case. Because TSaslTransport needs to issue a 
handshake when it
@@ -465,7 +477,10 @@ public class TServerUtils {
         address.getPort());
     InetSocketAddress isa = new InetSocketAddress(address.getHost(), 
address.getPort());
     // Must use an ISA, providing only a port would ignore the hostname given
-    TServerSocket transport = new TServerSocket(isa, (int) socketTimeout);
+    ServerSocketTransportArgs args = new 
ServerSocketTransportArgs().backlog(backlog).bindAddr(isa)
+        .clientTimeout((int) socketTimeout);
+
+    TServerSocket transport = new TServerSocket(args);
 
     String hostname, fqdn;
     try {
@@ -550,7 +565,7 @@ public class TServerUtils {
       ThriftServerType serverType, TProcessor processor, String serverName, 
String threadName,
       int numThreads, long threadTimeOut, long timeBetweenThreadChecks, long 
maxMessageSize,
       SslConnectionParams sslParams, SaslServerConnectionParams saslParams,
-      long serverSocketTimeout, HostAndPort... addresses) {
+      long serverSocketTimeout, int backlog, HostAndPort... addresses) {
 
     if (serverType == ThriftServerType.SASL) {
       processor = updateSaslProcessor(serverType, processor);
@@ -559,7 +574,7 @@ public class TServerUtils {
     try {
       return startTServer(serverType, new TimedProcessor(processor), 
serverName, threadName,
           numThreads, threadTimeOut, conf, timeBetweenThreadChecks, 
maxMessageSize, sslParams,
-          saslParams, serverSocketTimeout, addresses);
+          saslParams, serverSocketTimeout, backlog, addresses);
     } catch (TTransportException e) {
       throw new IllegalStateException(e);
     }
@@ -576,7 +591,7 @@ public class TServerUtils {
       String serverName, String threadName, int numThreads, long threadTimeOut,
       final AccumuloConfiguration conf, long timeBetweenThreadChecks, long 
maxMessageSize,
       SslConnectionParams sslParams, SaslServerConnectionParams saslParams,
-      long serverSocketTimeout, HostAndPort... addresses) throws 
TTransportException {
+      long serverSocketTimeout, int backlog, HostAndPort... addresses) throws 
TTransportException {
     TProtocolFactory protocolFactory = ThriftUtil.protocolFactory();
     // This is presently not supported. It's hypothetically possible, I 
believe, to work, but it
     // would require changes in how the transports
@@ -599,24 +614,24 @@ public class TServerUtils {
             log.debug("Instantiating SASL Thrift server");
             serverAddress = createSaslThreadPoolServer(address, processor, 
protocolFactory,
                 serverSocketTimeout, saslParams, serverName, numThreads, 
threadTimeOut, conf,
-                timeBetweenThreadChecks);
+                timeBetweenThreadChecks, backlog);
             break;
           case THREADPOOL:
             log.debug("Instantiating unsecure TThreadPool Thrift server");
             serverAddress =
                 createBlockingServer(address, processor, protocolFactory, 
maxMessageSize,
-                    serverName, numThreads, threadTimeOut, conf, 
timeBetweenThreadChecks);
+                    serverName, numThreads, threadTimeOut, conf, 
timeBetweenThreadChecks, backlog);
             break;
           case THREADED_SELECTOR:
             log.debug("Instantiating default, unsecure Threaded selector 
Thrift server");
-            serverAddress =
-                createThreadedSelectorServer(address, processor, 
protocolFactory, serverName,
-                    numThreads, threadTimeOut, conf, timeBetweenThreadChecks, 
maxMessageSize);
+            serverAddress = createThreadedSelectorServer(address, processor, 
protocolFactory,
+                serverName, numThreads, threadTimeOut, conf, 
timeBetweenThreadChecks,
+                maxMessageSize, backlog);
             break;
           case CUSTOM_HS_HA:
             log.debug("Instantiating unsecure custom half-async Thrift 
server");
             serverAddress = createNonBlockingServer(address, processor, 
protocolFactory, serverName,
-                numThreads, threadTimeOut, conf, timeBetweenThreadChecks, 
maxMessageSize);
+                numThreads, threadTimeOut, conf, timeBetweenThreadChecks, 
maxMessageSize, backlog);
             break;
           default:
             throw new IllegalArgumentException("Unknown server type " + 
serverType);
diff --git 
a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java 
b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
index edb660a70c..8b8a4ff9c6 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
@@ -404,7 +404,8 @@ public class SimpleGarbageCollector extends AbstractServer 
implements Iface {
     ServerAddress server = TServerUtils.startTServer(getConfiguration(),
         getContext().getThriftServerType(), processor, 
this.getClass().getSimpleName(),
         "GC Monitor Service", 2, ThreadPools.DEFAULT_TIMEOUT_MILLISECS, 1000, 
maxMessageSize,
-        getContext().getServerSslParams(), getContext().getSaslParams(), 0, 
addresses);
+        getContext().getServerSslParams(), getContext().getSaslParams(), 0,
+        getConfiguration().getCount(Property.RPC_BACKLOG), addresses);
     log.debug("Starting garbage collector listening on " + server.address);
     return server.address;
   }
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java 
b/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java
index 282fbc3f56..61c3c24e7c 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java
@@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.clientImpl.thrift.ClientService;
+import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.conf.SiteConfiguration;
 import org.apache.accumulo.core.fate.zookeeper.ServiceLock;
 import org.apache.accumulo.core.fate.zookeeper.ServiceLock.LockLossReason;
@@ -120,10 +121,11 @@ public class ZombieTServer {
             TabletScanClientService.Processor.class, 
TabletScanClientService.Iface.class, tch,
             context));
 
-    ServerAddress serverPort =
-        TServerUtils.startTServer(context.getConfiguration(), 
ThriftServerType.CUSTOM_HS_HA,
-            muxProcessor, "ZombieTServer", "walking dead", 2, 
ThreadPools.DEFAULT_TIMEOUT_MILLISECS,
-            1000, 10 * 1024 * 1024, null, null, -1, 
HostAndPort.fromParts("0.0.0.0", port));
+    ServerAddress serverPort = 
TServerUtils.startTServer(context.getConfiguration(),
+        ThriftServerType.CUSTOM_HS_HA, muxProcessor, "ZombieTServer", "walking 
dead", 2,
+        ThreadPools.DEFAULT_TIMEOUT_MILLISECS, 1000, 10 * 1024 * 1024, null, 
null, -1,
+        context.getConfiguration().getCount(Property.RPC_BACKLOG),
+        HostAndPort.fromParts("0.0.0.0", port));
 
     String addressString = serverPort.address.toString();
     var zLockPath =
diff --git 
a/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java 
b/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java
index d11da541d5..c76736bb6d 100644
--- a/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java
+++ b/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java
@@ -340,7 +340,8 @@ public class NullTserver {
 
     TServerUtils.startTServer(context.getConfiguration(), 
ThriftServerType.CUSTOM_HS_HA,
         muxProcessor, "NullTServer", "null tserver", 2, 
ThreadPools.DEFAULT_TIMEOUT_MILLISECS, 1000,
-        10 * 1024 * 1024, null, null, -1, HostAndPort.fromParts("0.0.0.0", 
opts.port));
+        10 * 1024 * 1024, null, null, -1, 
context.getConfiguration().getCount(Property.RPC_BACKLOG),
+        HostAndPort.fromParts("0.0.0.0", opts.port));
 
     HostAndPort addr = 
HostAndPort.fromParts(InetAddress.getLocalHost().getHostName(), opts.port);
 

Reply via email to