Added: tomcat/trunk/java/org/apache/tomcat/jni/socket/AprSocketContext.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/jni/socket/AprSocketContext.java?rev=1686276&view=auto ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/jni/socket/AprSocketContext.java (added) +++ tomcat/trunk/java/org/apache/tomcat/jni/socket/AprSocketContext.java Thu Jun 18 17:10:08 2015 @@ -0,0 +1,1352 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tomcat.jni.socket; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.tomcat.jni.Address; +import org.apache.tomcat.jni.Error; +import org.apache.tomcat.jni.Library; +import org.apache.tomcat.jni.OS; +import org.apache.tomcat.jni.Poll; +import org.apache.tomcat.jni.Pool; +import org.apache.tomcat.jni.SSL; +import org.apache.tomcat.jni.SSLContext; +import org.apache.tomcat.jni.SSLExt; +import org.apache.tomcat.jni.Socket; +import org.apache.tomcat.jni.Status; + +public class AprSocketContext { + /** + * Called when a chunk of data is sent or received. This is very low + * level, used mostly for debugging or stats. + */ + public static interface RawDataHandler { + public void rawData(AprSocket ch, boolean input, byte[] data, int pos, + int len, int requested, boolean closed); + } + + /** + * Called in SSL mode after the handshake is completed. + * + * @see AprSocketContext#customVerification(TlsCertVerifier) + */ + public static interface TlsCertVerifier { + public void handshakeDone(AprSocket ch); + } + + /** + * Delegates loading of persistent info about a host - public certs, + * tickets, config, persistent info etc. + */ + public static interface HostInfoLoader { + public HostInfo getHostInfo(String name, int port, boolean ssl); + } + + private static final Logger log = Logger.getLogger("AprSocketCtx"); + + // If interrupt() or thread-safe poll update are not supported - the + // poll updates will happen after the poll() timeout. + // The poll timeout with interrupt/thread safe updates can be much higher/ + private static final int FALLBACK_POLL_TIME = 2000; + + // It seems to send the ticket, get server helo / ChangeCipherSpec, but than + // SSL3_GET_RECORD:decryption failed or bad record mac in s3_pkt.c:480: + // Either bug in openssl, or some combination of ciphers - needs more debugging. + // ( this can save a roundtrip and CPU on TLS handshake ) + boolean USE_TICKETS = false; + + private final AprSocket END = new AprSocket(this); + + private static final AtomicInteger contextNumber = new AtomicInteger(); + private int contextId; + + private final AtomicInteger threadNumber = new AtomicInteger(); + + /** + * For now - single acceptor thread per connector. + */ + private AcceptorThread acceptor; + private AcceptorDispatchThread acceptorDispatch; + + // APR/JNI is thread safe + private boolean threadSafe = true; + + /** + * Pollers. + */ + private final List<AprPoller> pollers = new ArrayList<>(); + + // Set on all accepted or connected sockets. + // TODO: add the other properties + boolean tcpNoDelay = true; + + protected boolean running = true; + + protected boolean sslMode; + + // onSocket() will be called in accept thread. + // If false: use executor ( but that may choke the acceptor thread ) + private boolean nonBlockingAccept = false; + + private final BlockingQueue<AprSocket> acceptedQueue = + new LinkedBlockingQueue<>(); + + /** + * Root APR memory pool. + */ + private long rootPool = 0; + + /** + * SSL context. + */ + private volatile long sslCtx = 0; + + TlsCertVerifier tlsCertVerifier; + + // + final int connectTimeout = 20000; + final int defaultTimeout = 100000; + // TODO: Use this + final int keepAliveTimeout = 20000; + + final AtomicInteger open = new AtomicInteger(); + + /** + * Poll interval, in microseconds. If the platform doesn't support + * poll interrupt - it'll take this time to stop the poller. + * + */ + private int pollTime = 5 * 1000000; + + private HostInfoLoader hostInfoLoader; + + final RawDataHandler rawDataHandler = null; + + // TODO: do we need this here ? + private final Map<String, HostInfo> hosts = new HashMap<>(); + + private String certFile; + private String keyFile; + + private byte[] spdyNPN; + + private byte[] ticketKey; + + // For resolving DNS ( i.e. connect ), callbacks + private ExecutorService threadPool; + + // Separate executor for connect/handshakes + final ExecutorService connectExecutor; + + final boolean debugSSL = false; + private boolean debugPoll = false; + + private boolean deferAccept = false; + + private int backlog = 100; + + private boolean useSendfile; + + private int sslProtocol = SSL.SSL_PROTOCOL_TLSV1 | SSL.SSL_PROTOCOL_TLSV1_1 | SSL.SSL_PROTOCOL_TLSV1_2; + + /** + * Max time spent in a callback ( will be longer for blocking ) + */ + final AtomicLong maxHandlerTime = new AtomicLong(); + final AtomicLong totalHandlerTime = new AtomicLong(); + final AtomicLong handlerCount = new AtomicLong(); + + /** + * Total connections handled ( accepted or connected ). + */ + private final AtomicInteger connectionsCount = new AtomicInteger(); + + + public AprSocketContext() { + connectExecutor =new ThreadPoolExecutor(0, 64, 5, TimeUnit.SECONDS, + new LinkedBlockingQueue<Runnable>(), new RejectedExecutionHandler() { + @Override + public void rejectedExecution(Runnable r, + java.util.concurrent.ThreadPoolExecutor executor) { + AprSocket s = (AprSocket) r; + log.severe("Rejecting " + s); + s.reset(); + } + }); + contextId = contextNumber.incrementAndGet(); + } + + /** + * Poller thread count. + */ + private int pollerThreadCount = 4; + public void setPollerThreadCount(int pollerThreadCount) { this.pollerThreadCount = pollerThreadCount; } + public int getPollerThreadCount() { return pollerThreadCount; } + + // to test the limits - default should be lower + private int maxConnections = 64 * 1024; + public void setMaxconnections(int maxCon) { + this.maxConnections = maxCon; + } + + public void setBacklog(int backlog) { if (backlog > 0) this.backlog = backlog; } + public int getBacklog() { return backlog; } + + /** + * Defer accept. + */ + public void setDeferAccept(boolean deferAccept) { this.deferAccept = deferAccept; } + public boolean getDeferAccept() { return deferAccept; } + + /** + * For client: + * - ClientHello will include the npn extension ( the ID == 0x3374) + * - if ServerHello includes a list of protocols - select one + * - send it after ChangeCipherSpec and before Finish + * + * For server: + * - if ClientHello includes the npn extension + * -- will send this string as list of supported protocols in ServerHello + * - read the selection before Finish. + * @param npn + */ + public void setNpn(String npn) { + byte[] data = npn.getBytes(); + byte[] npnB = new byte[data.length + 2]; + + System.arraycopy(data, 0, npnB, 1, data.length); + npnB[0] = (byte) data.length; + npnB[npnB.length - 1] = 0; + spdyNPN = npnB; + + } + + public void setNpn(byte[] data) { + spdyNPN = data; + } + + public void setHostLoader(HostInfoLoader handler) { + this.hostInfoLoader = handler; + } + + public boolean isServer() { + return acceptor != null; + } + + protected Executor getExecutor() { + if (threadPool == null) { + threadPool = Executors.newCachedThreadPool(new ThreadFactory( ) { + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(r, "AprThread-" + contextId + "-" + + threadNumber.incrementAndGet()); + t.setDaemon(true); + return t; + } + }); + } + return threadPool; + } + + /** + * All accepted/connected sockets will start handshake automatically. + */ + public AprSocketContext setTls() { + this.sslMode = true; + return this; + } + + public void setTcpNoDelay(boolean b) { + tcpNoDelay = b; + } + + public void setSslProtocol(String protocol) { + protocol = protocol.trim(); + if ("SSLv2".equalsIgnoreCase(protocol)) { + sslProtocol = SSL.SSL_PROTOCOL_SSLV2; + } else if ("SSLv3".equalsIgnoreCase(protocol)) { + sslProtocol = SSL.SSL_PROTOCOL_SSLV3; + } else if ("TLSv1".equalsIgnoreCase(protocol)) { + sslProtocol = SSL.SSL_PROTOCOL_TLSV1; + } else if ("TLSv1.1".equalsIgnoreCase(protocol)) { + sslProtocol = SSL.SSL_PROTOCOL_TLSV1_1; + } else if ("TLSv1.2".equalsIgnoreCase(protocol)) { + sslProtocol = SSL.SSL_PROTOCOL_TLSV1_2; + } else if ("all".equalsIgnoreCase(protocol)) { + sslProtocol = SSL.SSL_PROTOCOL_ALL; + } + } + + public void setTicketKey(byte[] key48Bytes) { + if(key48Bytes.length != 48) { + throw new RuntimeException("Key must be 48 bytes"); + } + this.ticketKey = key48Bytes; + } + + public void customVerification(TlsCertVerifier verifier) { + tlsCertVerifier = verifier; + } + + // TODO: should have a separate method for switching to tls later. + /** + * Set certificate, will also enable TLS mode. + */ + public AprSocketContext setKeys(String certPemFile, String keyDerFile) { + this.sslMode = true; + setTls(); + certFile = certPemFile; + keyFile = keyDerFile; + return this; + } + + /** + * SSL cipher suite. + */ + private String SSLCipherSuite = "ALL"; + public String getSSLCipherSuite() { return SSLCipherSuite; } + public void setSSLCipherSuite(String SSLCipherSuite) { this.SSLCipherSuite = SSLCipherSuite; } + + /** + * Override or use hostInfoLoader to implement persistent/memcache storage. + */ + public HostInfo getHostInfo(String host, int port, boolean ssl) { + if (hostInfoLoader != null) { + return hostInfoLoader.getHostInfo(host, port, ssl); + } + // Use local cache + String key = host + ":" + port; + HostInfo pi = hosts.get(key); + if (pi != null) { + return pi; + } + pi = new HostInfo(host, port, ssl); + hosts.put(key, pi); + return pi; + } + + protected void rawData(AprSocket ch, boolean inp, byte[] data, int pos, + int len, int requested, boolean closed) { + if (rawDataHandler != null) { + rawDataHandler.rawData(ch, inp, data, pos, len, requested, closed); + } + } + + public void listen(final int port) throws IOException { + if (acceptor != null) { + throw new IOException("Already accepting on " + acceptor.port); + } + if (sslMode && certFile == null) { + throw new IOException("Missing certificates for server"); + } + if (sslMode || !nonBlockingAccept) { + acceptorDispatch = new AcceptorDispatchThread(); + acceptorDispatch.setName("AprAcceptorDispatch-" + port); + acceptorDispatch.start(); + } + + acceptor = new AcceptorThread(port); + acceptor.prepare(); + acceptor.setName("AprAcceptor-" + port); + acceptor.start(); + + + } + + /** + * Get a socket for connectiong to host:port. + */ + public AprSocket socket(String host, int port, boolean ssl) { + HostInfo hi = getHostInfo(host, port, ssl); + return socket(hi); + } + + public AprSocket socket(HostInfo hi) { + AprSocket sock = newSocket(this); + sock.setHost(hi); + return sock; + } + + public AprSocket socket(long socket) { + AprSocket sock = newSocket(this); + // Tomcat doesn't set this + SSLExt.sslSetMode(socket, SSLExt.SSL_MODE_ENABLE_PARTIAL_WRITE | + SSLExt.SSL_MODE_ACCEPT_MOVING_WRITE_BUFFER); + sock.setStatus(AprSocket.ACCEPTED); + sock.socket = socket; + return sock; + } + + + void destroySocket(AprSocket socket) { + // TODO: does it need to be done in io thread ? + synchronized (socket) { + if (socket.socket != 0) { + long s = socket.socket; + socket.socket = 0; + log.info("DESTROY: " + Long.toHexString(s)); + Socket.destroy(s); + } + } + } + + protected void connectBlocking(AprSocket apr) throws IOException { + try { + if (!running) { + throw new IOException("Stopped"); + } + HostInfo hi = apr.getHost(); + + long clientSockP; + synchronized (pollers) { + long socketpool = Pool.create(getRootPool()); + + int family = Socket.APR_INET; + + clientSockP = Socket.create(family, + Socket.SOCK_STREAM, + Socket.APR_PROTO_TCP, socketpool); // or rootPool ? + } + Socket.timeoutSet(clientSockP, connectTimeout * 1000); + if (OS.IS_UNIX) { + Socket.optSet(clientSockP, Socket.APR_SO_REUSEADDR, 1); + } + + Socket.optSet(clientSockP, Socket.APR_SO_KEEPALIVE, 1); + + // Blocking + // TODO: use socket pool + // TODO: cache it ( and TTL ) in hi + long inetAddress = Address.info(hi.host, Socket.APR_INET, + hi.port, 0, rootPool); + // this may take a long time - stop/destroy must wait + // at least connect timeout + int rc = Socket.connect(clientSockP, inetAddress); + + if (rc != 0) { + synchronized (pollers) { + Socket.close(clientSockP); + Socket.destroy(clientSockP); + } + /////Pool.destroy(socketpool); + throw new IOException("Socket.connect(): " + rc + " " + Error.strerror(rc) + " " + connectTimeout); + } + if (!running) { + throw new IOException("Stopped"); + } + + connectionsCount.incrementAndGet(); + if (tcpNoDelay) { + Socket.optSet(clientSockP, Socket.APR_TCP_NODELAY, 1); + } + + Socket.timeoutSet(clientSockP, defaultTimeout * 1000); + + apr.socket = clientSockP; + + apr.afterConnect(); + } catch (IOException e) { + apr.reset(); + throw e; + } catch (Throwable e) { + apr.reset(); + e.printStackTrace(); + throw new IOException(e); + } + } + + AprSocket newSocket(AprSocketContext context) { + return new AprSocket(context); + } + + /** + * To clean the pools - we could track if all channels are + * closed, but this seems simpler and safer. + */ + @Override + protected void finalize() throws Throwable { + if (rootPool != 0) { + log.warning(this + " GC without stop()"); + try { + stop(); + } catch (Exception e) { + //TODO Auto-generated catch block + e.printStackTrace(); + } + } + super.finalize(); + } + + + public void stop() { + synchronized (pollers) { + if (!running) { + return; + } + running = false; + } + + if (rootPool != 0) { + if (acceptor != null) { + try { + acceptor.unblock(); + acceptor.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + if (acceptorDispatch != null) { + acceptedQueue.add(END); + try { + acceptorDispatch.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + if (threadPool != null) { + threadPool.shutdownNow(); + } + + log.info("Stopping pollers " + contextId); + + while (true) { + AprPoller a; + synchronized (pollers) { + if (pollers.size() == 0) { + break; + } + a = pollers.remove(0); + } + a.interruptPoll(); + try { + a.join(); + log.info("Poller " + a.id + " done "); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + } + + + // Called when the last poller has been destroyed. + void destroy() { + synchronized (pollers) { + if (pollers.size() != 0) { + return; + } + + if (rootPool == 0) { + return; + } + log.info("Destroy root pool " + rootPool); + //Pool.destroy(rootPool); + //rootPool = 0; + } + } + + private static IOException noApr; + static { + + try { + Library.initialize(null); + SSL.initialize(null); + } catch (Exception e) { + noApr = new IOException("APR not present", e); + } + + } + + private long getRootPool() throws IOException { + if (rootPool == 0) { + if (noApr != null) { + throw noApr; + } + // Create the root APR memory pool + rootPool = Pool.create(0); + + // Adjust poller sizes + if ((OS.IS_WIN32 || OS.IS_WIN64) && (maxConnections > 1024)) { + // The maximum per poller to get reasonable performance is 1024 + pollerThreadCount = maxConnections / 1024; + // Adjust poller size so that it won't reach the limit + maxConnections = maxConnections - (maxConnections % 1024); + } + } + return rootPool; + } + + long getSslCtx() throws Exception { + if (sslCtx == 0) { + synchronized (AprSocketContext.class) { + if (sslCtx == 0) { + boolean serverMode = acceptor != null; + sslCtx = SSLContext.make(getRootPool(), + sslProtocol, + serverMode ? SSL.SSL_MODE_SERVER : SSL.SSL_MODE_CLIENT); + + + // SSL.SSL_OP_NO_SSLv3 + int opts = SSL.SSL_OP_NO_SSLv2 | + SSL.SSL_OP_SINGLE_DH_USE; + + if (!USE_TICKETS || serverMode && ticketKey == null) { + opts |= SSL.SSL_OP_NO_TICKET; + } + + SSLContext.setOptions(sslCtx, opts); + // Set revocation + // SSLContext.setCARevocation(sslContext, SSLCARevocationFile, SSLCARevocationPath); + + // Client certificate verification - maybe make it option + try { + SSLContext.setCipherSuite(sslCtx, SSLCipherSuite); + + + if (serverMode) { + if (ticketKey != null) { + //SSLExt.setTicketKeys(sslCtx, ticketKey, ticketKey.length); + } + if (certFile != null) { + boolean rc = SSLContext.setCertificate(sslCtx, + certFile, + keyFile, null, SSL.SSL_AIDX_DSA); + if (!rc) { + throw new IOException("Can't set keys"); + } + } + SSLContext.setVerify(sslCtx, SSL.SSL_CVERIFY_NONE, 10); + + if (spdyNPN != null) { + SSLExt.setNPN(sslCtx, spdyNPN, spdyNPN.length); + } + } else { + if (tlsCertVerifier != null) { + // NONE ? + SSLContext.setVerify(sslCtx, + SSL.SSL_CVERIFY_NONE, 10); + } else { + SSLContext.setCACertificate(sslCtx, + "/etc/ssl/certs/ca-certificates.crt", + "/etc/ssl/certs"); + SSLContext.setVerify(sslCtx, + SSL.SSL_CVERIFY_REQUIRE, 10); + } + + if (spdyNPN != null) { + SSLExt.setNPN(sslCtx, spdyNPN, spdyNPN.length); + } + } + } catch (IOException e) { + throw e; + } catch (Exception e) { + throw new IOException(e); + } + } + // TODO: try release buffers + } + } + return sslCtx; + } + + void findPollerAndAdd(AprSocket ch) throws IOException { + if (ch.poller != null) { + ch.poller.requestUpdate(ch); + return; + } + assignPoller(ch); + } + + void assignPoller(AprSocket ch) throws IOException { + AprPoller target = null; + synchronized (pollers) { + // Make sure we have min number of pollers + int needPollers = pollerThreadCount - pollers.size(); + if (needPollers > 0) { + for (int i = needPollers; i > 0; i--) { + pollers.add(allocatePoller()); + } + } + int max = 0; + for (AprPoller poller: pollers) { + int rem = poller.remaining(); + if (rem > max) { + target = poller; + max = rem; + } + } + } + if (target != null && target.add(ch)) { + return; + } + + // can't be added - add a new poller + synchronized (pollers) { + AprPoller poller = allocatePoller(); + poller.add(ch); + pollers.add(poller); + } + } + + /** + * Called on each accepted socket (for servers) or after connection (client) + * after handshake. + */ + protected void onSocket(@SuppressWarnings("unused") AprSocket s) { + // Defaults to NO-OP. Parameter is used by sub-classes. + } + + private class AcceptorThread extends Thread { + private final int port; + private long serverSockPool = 0; + private long serverSock = 0; + + private long inetAddress; + + AcceptorThread(int port) { + this.port = port; + setDaemon(true); + } + + void prepare() throws IOException { + try { + // Create the pool for the server socket + serverSockPool = Pool.create(getRootPool()); + + int family = Socket.APR_INET; + inetAddress = + Address.info(null, family, port, 0, serverSockPool); + + // Create the APR server socket + serverSock = Socket.create(family, + Socket.SOCK_STREAM, + Socket.APR_PROTO_TCP, serverSockPool); + + if (OS.IS_UNIX) { + Socket.optSet(serverSock, Socket.APR_SO_REUSEADDR, 1); + } + // Deal with the firewalls that tend to drop the inactive sockets + Socket.optSet(serverSock, Socket.APR_SO_KEEPALIVE, 1); + // Bind the server socket + int ret = Socket.bind(serverSock, inetAddress); + if (ret != 0) { + throw new IOException("Socket.bind " + ret + " " + + Error.strerror(ret) + " port=" + port); + } + // Start listening on the server socket + ret = Socket.listen(serverSock, backlog ); + if (ret != 0) { + throw new IOException("endpoint.init.listen" + + ret + " " + Error.strerror(ret)); + } + if (OS.IS_WIN32 || OS.IS_WIN64) { + // On Windows set the reuseaddr flag after the bind/listen + Socket.optSet(serverSock, Socket.APR_SO_REUSEADDR, 1); + } + + // Sendfile usage on systems which don't support it cause major problems + if (useSendfile && !Library.APR_HAS_SENDFILE) { + useSendfile = false; + } + + // Delay accepting of new connections until data is available + // Only Linux kernels 2.4 + have that implemented + // on other platforms this call is noop and will return APR_ENOTIMPL. + if (deferAccept) { + if (Socket.optSet(serverSock, Socket.APR_TCP_DEFER_ACCEPT, 1) == Status.APR_ENOTIMPL) { + deferAccept = false; + } + } + } catch (Throwable t) { + throw new IOException(t); + } + } + + void unblock() { + try (java.net.Socket sock = new java.net.Socket()) { + // Easiest ( maybe safest ) way to interrupt accept + // we could have it in non-blocking mode, etc + sock.connect(new InetSocketAddress("127.0.0.1", port)); + } catch (Exception ex) { + // ignore - the acceptor may have shut down by itself. + } + } + + @Override + public void run() { + while (running) { + try { + // each socket has a pool. + final AprSocket ch = newSocket(AprSocketContext.this); + ch.setStatus(AprSocket.ACCEPTED); + + ch.socket = Socket.accept(serverSock); + if (!running) { + break; + } + connectionsCount.incrementAndGet(); + if (connectionsCount.get() % 1000 == 0) { + System.err.println("Accepted: " + connectionsCount.get()); + } + + if (nonBlockingAccept && !sslMode) { + ch.setStatus(AprSocket.CONNECTED); + // TODO: SSL really needs a thread. + onSocket(ch); + } else { + acceptedQueue.add(ch); + } + } catch (Throwable e) { + e.printStackTrace(); + } + } + Socket.close(serverSock); + } + } + + private class AcceptorDispatchThread extends Thread { + + AcceptorDispatchThread() { + setDaemon(true); + } + + @Override + public void run() { + while(running) { + try { + AprSocket ch = acceptedQueue.take(); + if (ch == END) { + return; + } + connectExecutor.execute(ch); + } catch (InterruptedException e) { + } + } + } + } + + /** + * Create the poller. With some versions of APR, the maximum poller size will + * be 62 (recompiling APR is necessary to remove this limitation). + * @throws IOException + */ + AprPoller allocatePoller() throws IOException { + long pool = Pool.create(getRootPool()); + int size = maxConnections / pollerThreadCount; + + long serverPollset = allocatePoller(size, pool); + + if (serverPollset == 0 && size > 1024) { + log.severe("Falling back to 1024-sized poll, won't scale"); + size = 1024; + serverPollset = allocatePoller(size, pool); + } + if (serverPollset == 0) { + log.severe("Falling back to 62-sized poll, won't scale"); + size = 62; + serverPollset = allocatePoller(size, pool); + } + + AprPoller res = new AprPoller(); + res.pool = pool; + res.serverPollset = serverPollset; + res.desc = new long[size * 2]; + res.size = size; + res.id = contextId++; + res.setDaemon(true); + res.setName("AprPoller-" + res.id); + res.start(); + if (debugPoll && !sizeLogged) { + sizeLogged = true; + log.info("Poller size " + (res.desc.length / 2)); + } + return res; + } + + // Removed the 'thread safe' updates for now, to simplify the code + // last test shows a small improvement, can switch later. + private static boolean sizeLogged = false; + + protected long allocatePoller(int size, long pool) { + int flag = threadSafe ? Poll.APR_POLLSET_THREADSAFE: 0; + for (int i = 0; i < 2; i++) { + try { + // timeout must be -1 - or ttl will take effect, strange results. + return Poll.create(size, pool, flag, -1); + } catch (Error e) { + e.printStackTrace(); + if (Status.APR_STATUS_IS_EINVAL(e.getError())) { + log.info(" endpoint.poll.limitedpollsize " + size); + return 0; + } else if (Status.APR_STATUS_IS_ENOTIMPL(e.getError())) { + // thread safe not supported + log.severe("THREAD SAFE NOT SUPPORTED" + e); + threadSafe = false; + // try again without the flags + continue; + } else { + log.severe("endpoint.poll.initfail" + e); + return 0; + } + } + } + log.severe("Unexpected ENOTIMPL with flag==0"); + return 0; + } + + class AprPoller extends Thread { + + private int id; + private int size; + private long serverPollset = 0; + private long pool = 0; + private long[] desc; + + private long lastPoll; + private long lastPollTime; + private final AtomicBoolean inPoll = new AtomicBoolean(false); + + // Should be replaced with socket data. + // used only to lookup by socket + private final Map<Long, AprSocket> channels = new HashMap<>(); + + // Active + pending, must be < desc.length / 2 + // The channel will also have poller=this when active or pending + // How many sockets have poller == this + private final AtomicInteger keepAliveCount = new AtomicInteger(); + // Tracks desc, how many sockets are actively polled + private final AtomicInteger polledCount = new AtomicInteger(); + + private final AtomicInteger pollCount = new AtomicInteger(); + + private final List<AprSocket> updates = new ArrayList<>(); + + @Override + public void run() { + if (!running) { + return; + } + if (debugPoll) { + log.info("Starting poller " + id + " " + (isServer() ? "SRV ": "CLI ")); + } + long t0 = System.currentTimeMillis(); + while (running) { + try { + updates(); + + // Pool for the specified interval. Remove signaled sockets + synchronized (this) { + inPoll.set(true); + } + // if updates are added after updates and poll - interrupt will have still + // work + + int rv = Poll.poll(serverPollset, pollTime, desc, true); + synchronized (this) { + inPoll.set(false); + if (!running) { + break; + } + } + + pollCount.incrementAndGet(); + lastPoll = System.currentTimeMillis(); + lastPollTime = lastPoll - t0; + + if (rv > 0) { + if (debugPoll) { + log.info(" Poll() id=" + id + " rv=" + rv + " keepAliveCount=" + keepAliveCount + + " polled = " + polledCount.get() + + " time=" + lastPollTime); + } + polledCount.addAndGet(-rv); + for (int pollIdx = 0; pollIdx < rv; pollIdx++) { + long sock = desc[pollIdx * 2 + 1]; + AprSocket ch; + boolean blocking = false; + + synchronized (channels) { + ch = channels.get(Long.valueOf(sock)); + if (ch != null) { + blocking = ch.isBlocking(); + } else { + log.severe("Polled socket not found !!!!!" + Long.toHexString(sock)); + // TODO: destroy/close the raw socket + continue; + } + } + // was removed from polling + ch.clearStatus(AprSocket.POLL); + + // We just removed it ( see last param to poll()). + // Check for failed sockets and hand this socket off to a worker + long mask = desc[pollIdx * 2]; + + boolean err = ((mask & Poll.APR_POLLERR) == Poll.APR_POLLERR); + boolean nval = ((mask & Poll.APR_POLLNVAL) != 0); + if (err || nval) { + System.err.println("ERR " + err + " NVAL " + nval); + } + + boolean out = (mask & Poll.APR_POLLOUT) == Poll.APR_POLLOUT; + boolean in = (mask & Poll.APR_POLLIN) == Poll.APR_POLLIN; + if (debugPoll) { + log.info(" Poll channel: " + Long.toHexString(mask) + + (out ? " OUT" :"") + + (in ? " IN": "") + + (err ? " ERR" : "") + + " Ch: " + ch); + } + + // will be set again in process(), if all read/write is done + ch.clearStatus(AprSocket.POLLOUT); + ch.clearStatus(AprSocket.POLLIN); + + // try to send if needed + if (blocking) { + synchronized (ch) { + ch.notifyAll(); + } + getExecutor().execute(ch); + } else { + ((AprSocketContext.NonBlockingPollHandler) ch.handler).process(ch, in, out, false); + + // Update polling for the channel (in IO thread, safe) + updateIOThread(ch); + } + } + } else if (rv < 0) { + int errn = -rv; + if (errn == Status.TIMEUP) { + // to or interrupt +// if (debugPoll) { +// log.info(" Poll() timeup" + " keepAliveCount=" + keepAliveCount + +// " polled = " + polledCount.get() +// + " time=" + lastPollTime); +// } + } else if (errn == Status.EINTR) { + // interrupt - no need to log + } else { + if (debugPoll) { + log.info(" Poll() rv=" + rv + " keepAliveCount=" + keepAliveCount + + " polled = " + polledCount.get() + + " time=" + lastPollTime); + } + /* Any non timeup or interrupted error is critical */ + if (errn > Status.APR_OS_START_USERERR) { + errn -= Status.APR_OS_START_USERERR; + } + log.severe("endpoint.poll.fail " + errn + " " + Error.strerror(errn)); + // Handle poll critical failure + synchronized (this) { + destroyPoller(); // will close all sockets + } + continue; + } + } + // TODO: timeouts + } catch (Throwable t) { + log.log(Level.SEVERE, "endpoint.poll.error", t); + } + + } + if (!running) { + destroyPoller(); + } + } + + /** + * Destroy the poller. + */ + protected void destroyPoller() { + synchronized (pollers) { + pollers.remove(this); + } + log.info("Poller stopped after cnt=" + + pollCount.get() + + " sockets=" + channels.size() + + " lastPoll=" + lastPoll); + + // Close all sockets + synchronized (this) { + if (serverPollset == 0) { + return; + } + +// for (AprSocket ch: channels.values()) { +// ch.poller = null; +// ch.reset(); +// } + keepAliveCount.set(0); + log.warning("Destroy pollset"); + //serverPollset = 0; + } + Pool.destroy(pool); + pool = 0; + synchronized (pollers) { + // Now we can destroy the root pool + if (pollers.size() == 0 && !running) { + log.info("Destroy server context"); +// AprSocketContext.this.destroy(); + } + } + } + + /** + * Called only in poller thread, only used if not thread safe + * @throws IOException + */ + protected void updates() throws IOException { + synchronized (this) { + for (AprSocket up: updates) { + updateIOThread(up); + } + updates.clear(); + } + } + + void interruptPoll() { + try { + int rc = Status.APR_SUCCESS; + synchronized (this) { + if (serverPollset != 0) { + rc = Poll.interrupt(serverPollset); + } else { + log.severe("Interrupt with closed pollset"); + } + } + if (rc != Status.APR_SUCCESS) { + log.severe("Failed interrupt and not thread safe"); + } + } catch (Throwable t) { + t.printStackTrace(); + if (pollTime > FALLBACK_POLL_TIME) { + pollTime = FALLBACK_POLL_TIME; + } + } + } + + + int remaining() { + synchronized (channels) { + return (desc.length - channels.size() * 2); + } + } + + + + /** + * Called from any thread, return true if we could add it + * to pending. + */ + boolean add(AprSocket ch) throws IOException { + synchronized (this) { + if (!running) { + return false; + } + if (keepAliveCount.get() >= size) { + return false; + } + keepAliveCount.incrementAndGet(); + ch.poller = this; + } + + requestUpdate(ch); + + return true; + } + + /** + * May be called outside of IOThread. + */ + protected void requestUpdate(AprSocket ch) throws IOException { + synchronized (this) { + if (!running) { + return; + } + } + if (isPollerThread()) { + updateIOThread(ch); + } else { + synchronized (this) { + if (!updates.contains(ch)) { + updates.add(ch); + } + interruptPoll(); + } + if (debugPoll) { + log.info("Poll: requestUpdate " + id + " " + ch); + } + } + } + + private void updateIOThread(AprSocket ch) throws IOException { + if (!running || ch.socket == 0) { + return; + } + // called from IO thread, either in 'updates' or after + // poll. + //synchronized (ch) + boolean polling = ch.checkPreConnect(AprSocket.POLL); + + int requested = ch.requestedPolling(); + if (requested == 0) { + if (polling) { + removeSafe(ch); + } + if (ch.isClosed()) { + synchronized (channels) { + ch.poller = null; + channels.remove(Long.valueOf(ch.socket)); + } + keepAliveCount.decrementAndGet(); + ch.reset(); + } + } else { + if (polling) { + removeSafe(ch); + } + // will close if error + pollAdd(ch, requested); + } + if (debugPoll) { + log.info("Poll: updated=" + id + " " + ch); + } + } + + /** + * Called only from IO thread + */ + private void pollAdd(AprSocket up, int req) throws IOException { + boolean failed = false; + int rv; + synchronized (channels) { + if (up.isClosed()) { + return; + } + rv = Poll.add(serverPollset, up.socket, req); + if (rv != Status.APR_SUCCESS) { + up.poller = null; + keepAliveCount.decrementAndGet(); + failed = true; + } else { + polledCount.incrementAndGet(); + channels.put(Long.valueOf(up.socket), up); + up.setStatus(AprSocket.POLL); + } + } + if (failed) { + up.reset(); + throw new IOException("poll add error " + rv + " " + up + " " + Error.strerror(rv)); + } + } + + /** + * Called only from IO thread. Remove from Poll and channels, + * set POLL bit to false. + */ + private void removeSafe(AprSocket up) { + int rv = Status.APR_EGENERAL; + if (running && serverPollset != 0 && up.socket != 0 + && !up.isClosed()) { + rv = Poll.remove(serverPollset, up.socket); + } + up.clearStatus(AprSocket.POLL); + + if (rv != Status.APR_SUCCESS) { + log.severe("poll remove error " + Error.strerror(rv) + " " + up); + } else { + polledCount.decrementAndGet(); + } + } + + + public boolean isPollerThread() { + return Thread.currentThread() == this; + } + + } + + /** + * Callback for poll events, will be invoked in a thread pool. + * + */ + public static interface BlockingPollHandler { + + /** + * Called when the socket has been polled for in, out or closed. + * + * + */ + public void process(AprSocket ch, boolean in, boolean out, boolean close); + + + /** + * Called just before the socket is destroyed + */ + public void closed(AprSocket ch); + } + + /** + * Additional callbacks for non-blocking. + * This can be much faster - but it's harder to code, should be used only + * for low-level protocol implementation, proxies, etc. + * + * The model is restricted in many ways to avoid complexity and bugs: + * + * - read can only happen in the IO thread associated with the poller + * - user doesn't control poll interest - it is set automatically based + * on read/write results + * - it is only possible to suspend read, for TCP flow control - also + * only from the IO thread. Resume can happen from any thread. + * - it is also possible to call write() from any thread + */ + public static interface NonBlockingPollHandler extends BlockingPollHandler { + + /** + * Called after connection is established, in a thread pool. + * Process will be called next. + */ + public void connected(AprSocket ch); + + /** + * Before close, if an exception happens. + */ + public void error(AprSocket ch, Throwable t); + } + +}
Added: tomcat/trunk/java/org/apache/tomcat/jni/socket/HostInfo.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/jni/socket/HostInfo.java?rev=1686276&view=auto ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/jni/socket/HostInfo.java (added) +++ tomcat/trunk/java/org/apache/tomcat/jni/socket/HostInfo.java Thu Jun 18 17:10:08 2015 @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tomcat.jni.socket; + +import java.io.Serializable; + +/** + * Information about the remote host. Persisting this in memcache or similar + * storage can improve performance on future TLS connections by skipping roundtrips + * and reducing CPU use in handshake. + * + * This class is used in both server and client mode. + * + * AprSocketContextLitener.getPeer(name) can be used to read from an external storage. + * + * TODO: also save the SPDY persistent settings here. + * TODO: fix tickets, don't seem to work anymore. + */ +public class HostInfo implements Serializable { + + private static final long serialVersionUID = 1L; + + public String host; + + public int port; + + public boolean secure; + + /** + * Raw cert data (x.509 format). + * This is retrieved when a full handshake happens - if session reuse or tickets + * are used you'll not receive the certs again. + */ + public byte[][] certs; + + public byte[] ticket; + public int ticketLen; + + public String sessionId; + + /** + * DER-encoded session data. + */ + public byte[] sessDer; + + /** + * Negotiated NPN. + */ + byte[] npn; + int npnLen; + + public HostInfo() { + } + + public HostInfo(String host, int port, boolean secure) { + this.host = host; + this.port = port; + this.secure = secure; + } + + public String getNpn() { + return new String(npn, 0, npnLen); + } + + public void setNpn(String npn) { + if (npn == null) { + npnLen = 0; + } + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org