This is an automated email from the ASF dual-hosted git repository.

ctubbsii pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo-proxy.git


The following commit(s) were added to refs/heads/main by this push:
     new 63c9d52  Update to accumulo 2.1.1 (#81)
63c9d52 is described below

commit 63c9d5232783c76381079943e985461532e00bb7
Author: magrable <85767620+magra...@users.noreply.github.com>
AuthorDate: Mon Dec 2 15:50:58 2024 -0500

    Update to accumulo 2.1.1 (#81)
    
    * Moved TServerUtils into proxy code to maintain support for other Thrift 
Protocols
    
    ---------
    
    Co-authored-by: magrable <magra...@users.noreply.github.com>
---
 Dockerfile                                         |   4 +-
 pom.xml                                            |   2 +-
 src/main/java/org/apache/accumulo/proxy/Proxy.java |   4 +-
 .../org/apache/accumulo/proxy/TServerUtils.java    | 525 +++++++++++++++++++++
 4 files changed, 529 insertions(+), 6 deletions(-)

diff --git a/Dockerfile b/Dockerfile
index 4027d8d..b40bfa9 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -21,12 +21,12 @@ WORKDIR /opt/accumulo-proxy
 
 ARG HADOOP_VERSION=3.3.4
 ARG ZOOKEEPER_VERSION=3.8.0
-ARG ACCUMULO_VERSION=2.1.0
+ARG ACCUMULO_VERSION=2.1.1
 ARG ACCUMULO_PROXY_VERSION=2.0.0-SNAPSHOT
 
 ARG 
HADOOP_HASH=ca5e12625679ca95b8fd7bb7babc2a8dcb2605979b901df9ad137178718821097b67555115fafc6dbf6bb32b61864ccb6786dbc555e589694a22bf69147780b4
 ARG 
ZOOKEEPER_HASH=d66e3a40451f840406901b2cd940992b001f92049a372ae48d8b420891605871cd1ae5f6cceb3b10665491e7abef36a4078dace158bd1e0938fcd3567b5234ca
-ARG 
ACCUMULO_HASH=9bbc4defc114013f145e9e9fdd08683c842a29faaa01ea6e80555549a6aecef86ee6657cce23183b411a64c31123db682a6944e2825eef83c76fb5a91620235f
+ARG 
ACCUMULO_HASH=adb23e56362c2e3e813d07791389b8ca2d5976df8b00a29b607e6ae05ea465eff80ada6d1ec9a9c596df8b4066c51078cd5a4006dc78568ac38f638a1d3895be
 
 # Download from Apache mirrors instead of archive #9
 ENV APACHE_DIST_URLS \
diff --git a/pom.xml b/pom.xml
index 746bf21..9f70da8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -85,7 +85,7 @@
     <url>https://github.com/apache/accumulo-proxy/actions</url>
   </ciManagement>
   <properties>
-    <accumulo.version>2.1.0</accumulo.version>
+    <accumulo.version>2.1.1</accumulo.version>
     
<eclipseFormatterStyle>contrib/Eclipse-Accumulo-Codestyle.xml</eclipseFormatterStyle>
     <!-- extra release args for testing -->
     <extraReleaseArguments />
diff --git a/src/main/java/org/apache/accumulo/proxy/Proxy.java 
b/src/main/java/org/apache/accumulo/proxy/Proxy.java
index 50e38a3..017b298 100644
--- a/src/main/java/org/apache/accumulo/proxy/Proxy.java
+++ b/src/main/java/org/apache/accumulo/proxy/Proxy.java
@@ -38,7 +38,6 @@ import org.apache.accumulo.minicluster.MiniAccumuloCluster;
 import org.apache.accumulo.proxy.thrift.AccumuloProxy;
 import org.apache.accumulo.server.rpc.SaslServerConnectionParams;
 import org.apache.accumulo.server.rpc.ServerAddress;
-import org.apache.accumulo.server.rpc.TServerUtils;
 import org.apache.accumulo.server.rpc.ThriftServerType;
 import org.apache.accumulo.server.rpc.TimedProcessor;
 import org.apache.accumulo.server.rpc.UGIAssumingProcessor;
@@ -251,10 +250,9 @@ public class Proxy implements KeywordExecutable {
         break;
     }
 
-    TimedProcessor timedProcessor = new TimedProcessor(processor, serverName, 
threadName);
+    TimedProcessor timedProcessor = new TimedProcessor(processor);
 
     // Create the thrift server with our processor and properties
-
     return TServerUtils.startTServer(serverType, timedProcessor, 
protocolFactory, serverName,
         threadName, numThreads, ThreadPools.DEFAULT_TIMEOUT_MILLISECS,
         ClientConfConverter.toAccumuloConf(props), 1000L, maxFrameSize, 
sslParams, saslParams,
diff --git a/src/main/java/org/apache/accumulo/proxy/TServerUtils.java 
b/src/main/java/org/apache/accumulo/proxy/TServerUtils.java
new file mode 100644
index 0000000..c378f59
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/proxy/TServerUtils.java
@@ -0,0 +1,525 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.proxy;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import javax.net.ssl.SSLServerSocket;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.conf.PropertyType;
+import org.apache.accumulo.core.rpc.SslConnectionParams;
+import org.apache.accumulo.core.rpc.ThriftUtil;
+import org.apache.accumulo.core.rpc.UGIAssumingTransportFactory;
+import org.apache.accumulo.core.util.Halt;
+import org.apache.accumulo.core.util.HostAndPort;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.core.util.threads.ThreadPools;
+import org.apache.accumulo.core.util.threads.Threads;
+import org.apache.accumulo.server.rpc.ClientInfoProcessorFactory;
+import org.apache.accumulo.server.rpc.CustomNonBlockingServer;
+import org.apache.accumulo.server.rpc.CustomThreadedSelectorServer;
+import org.apache.accumulo.server.rpc.SaslServerConnectionParams;
+import org.apache.accumulo.server.rpc.SaslServerDigestCallbackHandler;
+import org.apache.accumulo.server.rpc.ServerAddress;
+import org.apache.accumulo.server.rpc.ThriftServerType;
+import org.apache.accumulo.server.rpc.TimedProcessor;
+import org.apache.hadoop.security.SaslRpcServer;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.TProcessorFactory;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.server.TThreadedSelectorServer;
+import org.apache.thrift.transport.TNonblockingServerSocket;
+import org.apache.thrift.transport.TSSLTransportFactory;
+import org.apache.thrift.transport.TSaslServerTransport;
+import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TServerTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.apache.thrift.transport.TTransportFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.primitives.Ints;
+
+/**
+ * Factory methods for creating Thrift server objects
+ */
+public class TServerUtils {
+  private static final Logger log = 
LoggerFactory.getLogger(TServerUtils.class);
+
+  /**
+   * Static instance, passed to {@link ClientInfoProcessorFactory}, which will 
contain the client
+   * address of any incoming RPC.
+   */
+  public static final ThreadLocal<String> clientAddress = new ThreadLocal<>();
+
+  /**
+   *
+   * @param config Accumulo configuration
+   * @return A Map object with reserved port numbers as keys and Property 
objects as values
+   */
+  static Map<Integer,Property> getReservedPorts(AccumuloConfiguration config,
+      Property portProperty) {
+    return EnumSet.allOf(Property.class).stream()
+        .filter(p -> p.getType() == PropertyType.PORT && p != portProperty)
+        .flatMap(rp -> config.getPortStream(rp).mapToObj(portNum -> new 
Pair<>(portNum, rp)))
+        .filter(p -> p.getFirst() != 
0).collect(Collectors.toMap(Pair::getFirst, Pair::getSecond));
+  }
+
+  /**
+   * Create a non blocking server with multiple select threads and a custom 
thread pool that can
+   * dynamically resize itself.
+   */
+  private static ServerAddress createThreadedSelectorServer(HostAndPort 
address,
+      TProcessor processor, TProtocolFactory protocolFactory, final String 
serverName,
+      final int numThreads, final long threadTimeOut, final 
AccumuloConfiguration conf,
+      long timeBetweenThreadChecks, long maxMessageSize) throws 
TTransportException {
+
+    final TNonblockingServerSocket transport =
+        new TNonblockingServerSocket(new InetSocketAddress(address.getHost(), 
address.getPort()), 0,
+            Ints.saturatedCast(maxMessageSize));
+
+    TThreadedSelectorServer.Args options = new 
TThreadedSelectorServer.Args(transport);
+
+    options.selectorThreads = Math.max(2, 
Runtime.getRuntime().availableProcessors() / 4);
+    log.info("selectorThreads : {}", options.selectorThreads);
+    options.protocolFactory(protocolFactory);
+    options.transportFactory(ThriftUtil.transportFactory(maxMessageSize));
+    options.maxReadBufferBytes = maxMessageSize;
+    options.stopTimeoutVal(5);
+
+    // Create our own very special thread pool.
+    ThreadPoolExecutor pool = createSelfResizingThreadPool(serverName, 
numThreads, threadTimeOut,
+        conf, timeBetweenThreadChecks);
+
+    options.executorService(pool);
+    options.processorFactory(new TProcessorFactory(processor));
+
+    if (address.getPort() == 0) {
+      address = HostAndPort.fromParts(address.getHost(), transport.getPort());
+    }
+
+    return new ServerAddress(new CustomThreadedSelectorServer(options), 
address);
+  }
+
+  /**
+   * Create a NonBlockingServer with a single select threads and a custom 
thread pool that can
+   * dynamically resize itself.
+   */
+  private static ServerAddress createNonBlockingServer(HostAndPort address, 
TProcessor processor,
+      TProtocolFactory protocolFactory, final String serverName, final int 
numThreads,
+      final long threadTimeOut, final AccumuloConfiguration conf, long 
timeBetweenThreadChecks,
+      long maxMessageSize) throws TTransportException {
+
+    final TNonblockingServerSocket transport =
+        new TNonblockingServerSocket(new InetSocketAddress(address.getHost(), 
address.getPort()), 0,
+            Ints.saturatedCast(maxMessageSize));
+    final CustomNonBlockingServer.Args options = new 
CustomNonBlockingServer.Args(transport);
+
+    options.protocolFactory(protocolFactory);
+    options.transportFactory(ThriftUtil.transportFactory(maxMessageSize));
+    options.maxReadBufferBytes = maxMessageSize;
+    options.stopTimeoutVal(5);
+
+    // Create our own very special thread pool.
+    ThreadPoolExecutor pool = createSelfResizingThreadPool(serverName, 
numThreads, threadTimeOut,
+        conf, timeBetweenThreadChecks);
+
+    options.executorService(pool);
+    options.processorFactory(new TProcessorFactory(processor));
+
+    if (address.getPort() == 0) {
+      address = HostAndPort.fromParts(address.getHost(), transport.getPort());
+    }
+
+    return new ServerAddress(new CustomNonBlockingServer(options), address);
+  }
+
+  /**
+   * Creates a {@link ThreadPoolExecutor} which uses a 
ScheduledThreadPoolExecutor to inspect the
+   * core pool size and number of active threads of the {@link 
ThreadPoolExecutor} and increase or
+   * decrease the core pool size based on activity (excessive or lack thereof).
+   *
+   * @param serverName A name to describe the thrift server this executor will 
service
+   * @param executorThreads The minimum number of threads for the executor
+   * @param threadTimeOut The time after which threads are allowed to 
terminate including core
+   *        threads. If set to 0, the core threads will indefinitely stay 
running waiting for work.
+   * @param conf Accumulo Configuration
+   * @param timeBetweenThreadChecks The amount of time, in millis, between 
attempts to resize the
+   *        executor thread pool
+   * @return A {@link ThreadPoolExecutor} which will resize itself 
automatically
+   */
+  private static ThreadPoolExecutor createSelfResizingThreadPool(final String 
serverName,
+      final int executorThreads, long threadTimeOut, final 
AccumuloConfiguration conf,
+      long timeBetweenThreadChecks) {
+    final ThreadPoolExecutor pool = 
ThreadPools.getServerThreadPools().createFixedThreadPool(
+        executorThreads, threadTimeOut, TimeUnit.MILLISECONDS, serverName + 
"-ClientPool", true);
+    // periodically adjust the number of threads we need by checking how busy 
our threads are
+    ThreadPools.watchCriticalFixedDelay(conf, timeBetweenThreadChecks, () -> {
+      // there is a minor race condition between sampling the current state of 
the thread pool
+      // and adjusting it however, this isn't really an issue, since it 
adjusts periodically
+      if (pool.getCorePoolSize() <= pool.getActiveCount()) {
+        int larger = pool.getCorePoolSize() + Math.min(pool.getQueue().size(), 
2);
+        ThreadPools.resizePool(pool, () -> larger, serverName + "-ClientPool");
+      } else {
+        if (pool.getCorePoolSize() > pool.getActiveCount() + 3) {
+          int smaller = Math.max(executorThreads, pool.getCorePoolSize() - 1);
+          ThreadPools.resizePool(pool, () -> smaller, serverName + 
"-ClientPool");
+        }
+      }
+    });
+    return pool;
+  }
+
+  /**
+   * Creates a TThreadPoolServer for normal unsecure operation. Useful for 
comparing performance
+   * against SSL or SASL transports.
+   *
+   * @param address Address to bind to
+   * @param processor TProcessor for the server
+   * @param maxMessageSize Maximum size of a Thrift message allowed
+   * @return A configured TThreadPoolServer and its bound address information
+   */
+  private static ServerAddress createBlockingServer(HostAndPort address, 
TProcessor processor,
+      TProtocolFactory protocolFactory, long maxMessageSize, String 
serverName, int numThreads,
+      long threadTimeOut, final AccumuloConfiguration conf, long 
timeBetweenThreadChecks)
+      throws TTransportException {
+
+    InetSocketAddress isa = new InetSocketAddress(address.getHost(), 
address.getPort());
+    // Must use an ISA, providing only a port would ignore the hostname given
+    TServerSocket transport = new TServerSocket(isa);
+    ThreadPoolExecutor pool = createSelfResizingThreadPool(serverName, 
numThreads, threadTimeOut,
+        conf, timeBetweenThreadChecks);
+    TThreadPoolServer server = createTThreadPoolServer(transport, processor,
+        ThriftUtil.transportFactory(maxMessageSize), protocolFactory, pool);
+
+    if (address.getPort() == 0) {
+      address =
+          HostAndPort.fromParts(address.getHost(), 
transport.getServerSocket().getLocalPort());
+      log.info("Blocking Server bound on {}", address);
+    }
+
+    return new ServerAddress(server, address);
+
+  }
+
+  /**
+   * Create a {@link TThreadPoolServer} with the provided server transport, 
processor and transport
+   * factory.
+   *
+   * @param transport TServerTransport for the server
+   * @param processor TProcessor for the server
+   * @param transportFactory TTransportFactory for the server
+   */
+  private static TThreadPoolServer createTThreadPoolServer(TServerTransport 
transport,
+      TProcessor processor, TTransportFactory transportFactory, 
TProtocolFactory protocolFactory,
+      ExecutorService service) {
+    TThreadPoolServer.Args options = new TThreadPoolServer.Args(transport);
+    options.protocolFactory(protocolFactory);
+    options.transportFactory(transportFactory);
+    options.processorFactory(new ClientInfoProcessorFactory(clientAddress, 
processor));
+    if (service != null) {
+      options.executorService(service);
+    }
+    return new TThreadPoolServer(options);
+  }
+
+  /**
+   * Create the Thrift server socket for RPC running over SSL.
+   *
+   * @param port Port of the server socket to bind to
+   * @param timeout Socket timeout
+   * @param address Address to bind the socket to
+   * @param params SSL parameters
+   * @return A configured TServerSocket configured to use SSL
+   */
+  private static TServerSocket getSslServerSocket(int port, int timeout, 
InetAddress address,
+      SslConnectionParams params) throws TTransportException {
+    TServerSocket tServerSock;
+    if (params.useJsse()) {
+      tServerSock =
+          TSSLTransportFactory.getServerSocket(port, timeout, 
params.isClientAuth(), address);
+    } else {
+      tServerSock = TSSLTransportFactory.getServerSocket(port, timeout, 
address,
+          params.getTTransportParams());
+    }
+
+    final ServerSocket serverSock = tServerSock.getServerSocket();
+    if (serverSock instanceof SSLServerSocket) {
+      SSLServerSocket sslServerSock = (SSLServerSocket) serverSock;
+      String[] protocols = params.getServerProtocols();
+
+      // Be nice for the user and automatically remove protocols that might 
not exist in their JVM.
+      // Keeps us from forcing config alterations too
+      // e.g. TLSv1.1 and TLSv1.2 don't exist in JDK6
+      Set<String> socketEnabledProtocols =
+          new HashSet<>(Arrays.asList(sslServerSock.getEnabledProtocols()));
+      // Keep only the enabled protocols that were specified by the 
configuration
+      socketEnabledProtocols.retainAll(Arrays.asList(protocols));
+      if (socketEnabledProtocols.isEmpty()) {
+        // Bad configuration...
+        throw new RuntimeException(
+            "No available protocols available for secure socket. Available 
protocols: "
+                + Arrays.toString(sslServerSock.getEnabledProtocols()) + ", 
allowed protocols: "
+                + Arrays.toString(protocols));
+      }
+
+      // Set the protocol(s) on the server socketlong
+      sslServerSock.setEnabledProtocols(socketEnabledProtocols.toArray(new 
String[0]));
+    }
+
+    return tServerSock;
+  }
+
+  /**
+   * Create a Thrift SSL server.
+   *
+   * @param address host and port to bind to
+   * @param processor TProcessor for the server
+   * @param socketTimeout Socket timeout
+   * @param sslParams SSL parameters
+   * @return A ServerAddress with the bound-socket information and the Thrift 
server
+   */
+  private static ServerAddress createSslThreadPoolServer(HostAndPort address, 
TProcessor processor,
+      TProtocolFactory protocolFactory, long socketTimeout, 
SslConnectionParams sslParams,
+      String serverName, int numThreads, long threadTimeOut, final 
AccumuloConfiguration conf,
+      long timeBetweenThreadChecks) throws TTransportException {
+    TServerSocket transport;
+    try {
+      transport = getSslServerSocket(address.getPort(), (int) socketTimeout,
+          InetAddress.getByName(address.getHost()), sslParams);
+    } catch (UnknownHostException e) {
+      throw new TTransportException(e);
+    }
+
+    if (address.getPort() == 0) {
+      address =
+          HostAndPort.fromParts(address.getHost(), 
transport.getServerSocket().getLocalPort());
+      log.info("SSL Thread Pool Server bound on {}", address);
+    }
+
+    ThreadPoolExecutor pool = createSelfResizingThreadPool(serverName, 
numThreads, threadTimeOut,
+        conf, timeBetweenThreadChecks);
+
+    return new ServerAddress(createTThreadPoolServer(transport, processor,
+        ThriftUtil.transportFactory(), protocolFactory, pool), address);
+  }
+
+  private static ServerAddress createSaslThreadPoolServer(HostAndPort address, 
TProcessor processor,
+      TProtocolFactory protocolFactory, long socketTimeout, 
SaslServerConnectionParams params,
+      final String serverName, final int numThreads, final long threadTimeOut,
+      final AccumuloConfiguration conf, long timeBetweenThreadChecks) throws 
TTransportException {
+    // We'd really prefer to use THsHaServer (or similar) to avoid 1 RPC == 1 
Thread that the
+    // TThreadPoolServer does,
+    // but sadly this isn't the case. Because TSaslTransport needs to issue a 
handshake when it
+    // open()'s which will fail
+    // when the server does an accept() to (presumably) wake up the eventing 
system.
+    log.info("Creating SASL thread pool thrift server on listening on {}:{}", 
address.getHost(),
+        address.getPort());
+    InetSocketAddress isa = new InetSocketAddress(address.getHost(), 
address.getPort());
+    // Must use an ISA, providing only a port would ignore the hostname given
+    TServerSocket transport = new TServerSocket(isa, (int) socketTimeout);
+
+    String hostname, fqdn;
+    try {
+      hostname = 
InetAddress.getByName(address.getHost()).getCanonicalHostName();
+      fqdn = InetAddress.getLocalHost().getCanonicalHostName();
+    } catch (UnknownHostException e) {
+      transport.close();
+      throw new TTransportException(e);
+    }
+
+    // If we can't get a real hostname from the provided host test, use the 
hostname from DNS for
+    // localhost
+    if ("0.0.0.0".equals(hostname)) {
+      hostname = fqdn;
+    }
+
+    // ACCUMULO-3497 an easy sanity check we can perform for the user when 
SASL is enabled. Clients
+    // and servers have to agree upon the FQDN
+    // so that the SASL handshake can occur. If the provided hostname doesn't 
match the FQDN for
+    // this host, fail quickly and inform them to update
+    // their configuration.
+    if (!hostname.equals(fqdn)) {
+      log.error("Expected hostname of '{}' but got '{}'. Ensure the entries in"
+          + " the Accumulo hosts files (e.g. managers, tservers) are the FQDN 
for"
+          + " each host when using SASL.", fqdn, hostname);
+      transport.close();
+      throw new RuntimeException("SASL requires that the address the thrift"
+          + " server listens on is the same as the FQDN for this host");
+    }
+
+    final UserGroupInformation serverUser;
+    try {
+      serverUser = UserGroupInformation.getLoginUser();
+    } catch (IOException e) {
+      transport.close();
+      ThriftUtil.checkIOExceptionCause(e);
+      throw new TTransportException(e);
+    }
+
+    log.debug("Logged in as {}, creating TSaslServerTransport factory with 
{}/{}", serverUser,
+        params.getKerberosServerPrimary(), hostname);
+
+    // Make the SASL transport factory with the instance and primary from the 
kerberos server
+    // principal, SASL properties
+    // and the SASL callback handler from Hadoop to ensure authorization ID is 
the authentication
+    // ID. Despite the 'protocol' argument seeming to be useless, it
+    // *must* be the primary of the server.
+    TSaslServerTransport.Factory saslTransportFactory = new 
TSaslServerTransport.Factory();
+    saslTransportFactory.addServerDefinition(ThriftUtil.GSSAPI, 
params.getKerberosServerPrimary(),
+        hostname, params.getSaslProperties(), new 
SaslRpcServer.SaslGssCallbackHandler());
+
+    if (params.getSecretManager() != null) {
+      log.info("Adding DIGEST-MD5 server definition for delegation tokens");
+      saslTransportFactory.addServerDefinition(ThriftUtil.DIGEST_MD5,
+          params.getKerberosServerPrimary(), hostname, 
params.getSaslProperties(),
+          new SaslServerDigestCallbackHandler(params.getSecretManager()));
+    } else {
+      log.info("SecretManager is null, not adding support for delegation token 
authentication");
+    }
+
+    // Make sure the TTransportFactory is performing a UGI.doAs
+    TTransportFactory ugiTransportFactory =
+        new UGIAssumingTransportFactory(saslTransportFactory, serverUser);
+
+    if (address.getPort() == 0) {
+      // If we chose a port dynamically, make a new use it (along with the 
proper hostname)
+      address =
+          HostAndPort.fromParts(address.getHost(), 
transport.getServerSocket().getLocalPort());
+      log.info("SASL thrift server bound on {}", address);
+    }
+
+    ThreadPoolExecutor pool = createSelfResizingThreadPool(serverName, 
numThreads, threadTimeOut,
+        conf, timeBetweenThreadChecks);
+
+    final TThreadPoolServer server =
+        createTThreadPoolServer(transport, processor, ugiTransportFactory, 
protocolFactory, pool);
+
+    return new ServerAddress(server, address);
+  }
+
+  /**
+   * Start the appropriate Thrift server (SSL or non-blocking server) for the 
given parameters.
+   * Non-null SSL parameters will cause an SSL server to be started.
+   *
+   * @return A ServerAddress encapsulating the Thrift server created and the 
host/port which it is
+   *         bound to.
+   */
+  public static ServerAddress startTServer(ThriftServerType serverType, 
TimedProcessor processor,
+      TProtocolFactory protocolFactory, String serverName, String threadName, 
int numThreads,
+      long threadTimeOut, final AccumuloConfiguration conf, long 
timeBetweenThreadChecks,
+      long maxMessageSize, SslConnectionParams sslParams, 
SaslServerConnectionParams saslParams,
+      long serverSocketTimeout, HostAndPort... addresses) throws 
TTransportException {
+
+    // This is presently not supported. It's hypothetically possible, I 
believe, to work, but it
+    // would require changes in how the transports
+    // work at the Thrift layer to ensure that both the SSL and SASL 
handshakes function. SASL's
+    // quality of protection addresses privacy issues.
+    checkArgument(sslParams == null || saslParams == null,
+        "Cannot start a Thrift server using both SSL and SASL");
+
+    ServerAddress serverAddress = null;
+    for (HostAndPort address : addresses) {
+      try {
+        switch (serverType) {
+          case SSL:
+            log.debug("Instantiating SSL Thrift server");
+            serverAddress = createSslThreadPoolServer(address, processor, 
protocolFactory,
+                serverSocketTimeout, sslParams, serverName, numThreads, 
threadTimeOut, conf,
+                timeBetweenThreadChecks);
+            break;
+          case SASL:
+            log.debug("Instantiating SASL Thrift server");
+            serverAddress = createSaslThreadPoolServer(address, processor, 
protocolFactory,
+                serverSocketTimeout, saslParams, serverName, numThreads, 
threadTimeOut, conf,
+                timeBetweenThreadChecks);
+            break;
+          case THREADPOOL:
+            log.debug("Instantiating unsecure TThreadPool Thrift server");
+            serverAddress =
+                createBlockingServer(address, processor, protocolFactory, 
maxMessageSize,
+                    serverName, numThreads, threadTimeOut, conf, 
timeBetweenThreadChecks);
+            break;
+          case THREADED_SELECTOR:
+            log.debug("Instantiating default, unsecure Threaded selector 
Thrift server");
+            serverAddress =
+                createThreadedSelectorServer(address, processor, 
protocolFactory, serverName,
+                    numThreads, threadTimeOut, conf, timeBetweenThreadChecks, 
maxMessageSize);
+            break;
+          case CUSTOM_HS_HA:
+            log.debug("Instantiating unsecure custom half-async Thrift 
server");
+            serverAddress = createNonBlockingServer(address, processor, 
protocolFactory, serverName,
+                numThreads, threadTimeOut, conf, timeBetweenThreadChecks, 
maxMessageSize);
+            break;
+          default:
+            throw new IllegalArgumentException("Unknown server type " + 
serverType);
+        }
+        break;
+      } catch (TTransportException e) {
+        log.warn("Error attempting to create server at {}. Error: {}", 
address, e.getMessage());
+      }
+    }
+    if (serverAddress == null) {
+      throw new TTransportException(
+          "Unable to create server on addresses: " + 
Arrays.toString(addresses));
+    }
+
+    final TServer finalServer = serverAddress.server;
+
+    Threads.createThread(threadName, () -> {
+      try {
+        finalServer.serve();
+      } catch (Error e) {
+        Halt.halt("Unexpected error in TThreadPoolServer " + e + ", halting.", 
1);
+      }
+    }).start();
+
+    // check for the special "bind to everything address"
+    if (serverAddress.address.getHost().equals("0.0.0.0")) {
+      // can't get the address from the bind, so we'll do our best to invent 
our hostname
+      try {
+        serverAddress = new ServerAddress(finalServer, HostAndPort
+            .fromParts(InetAddress.getLocalHost().getHostName(), 
serverAddress.address.getPort()));
+      } catch (UnknownHostException e) {
+        throw new TTransportException(e);
+      }
+    }
+    return serverAddress;
+  }
+}

Reply via email to