This is an automated email from the ASF dual-hosted git repository. markt pushed a commit to branch 8.5.x in repository https://gitbox.apache.org/repos/asf/tomcat.git
commit 4508e70e4fcf542c6071ef77c13cd7141abb9bf4 Author: Mark Thomas <ma...@apache.org> AuthorDate: Tue Nov 19 13:18:36 2019 +0000 Refactor APR Poller to remove use of multiple pollsets --- java/org/apache/tomcat/util/net/AprEndpoint.java | 409 +++++++++-------------- webapps/docs/changelog.xml | 9 + 2 files changed, 165 insertions(+), 253 deletions(-) diff --git a/java/org/apache/tomcat/util/net/AprEndpoint.java b/java/org/apache/tomcat/util/net/AprEndpoint.java index 5af9fe0..4821279 100644 --- a/java/org/apache/tomcat/util/net/AprEndpoint.java +++ b/java/org/apache/tomcat/util/net/AprEndpoint.java @@ -1061,36 +1061,14 @@ public class AprEndpoint extends AbstractEndpoint<Long> implements SNICallBack { public class Poller implements Runnable { /** - * Pointers to the pollers. + * Pointer to the poller. */ - private long[] pollers = null; + private long aprPoller; /** * Actual poller size. */ - private int actualPollerSize = 0; - - /** - * Amount of spots left in the poller. - */ - private int[] pollerSpace = null; - - /** - * Amount of low level pollers in use by this poller. - */ - private int pollerCount; - - /** - * Timeout value for the poll call. - */ - private int pollerTime; - - /** - * Variable poller timeout that adjusts depending on how many poll sets - * are in use so that the total poll time across all poll sets remains - * equal to pollTime. - */ - private int nextPollerTime; + private int pollerSize = 0; /** * Root pool. @@ -1144,55 +1122,18 @@ public class AprEndpoint extends AbstractEndpoint<Long> implements SNICallBack { private volatile boolean pollerRunning = true; /** - * Create the poller. With some versions of APR, the maximum poller size - * will be 62 (recompiling APR is necessary to remove this limitation). + * Create the poller. */ protected synchronized void init() { pool = Pool.create(serverSockPool); - - // Single poller by default - int defaultPollerSize = getMaxConnections(); - - if ((OS.IS_WIN32 || OS.IS_WIN64) && (defaultPollerSize > 1024)) { - // The maximum per poller to get reasonable performance is 1024 - // Adjust poller size so that it won't reach the limit. This is - // a limitation of XP / Server 2003 that has been fixed in - // Vista / Server 2008 onwards. - actualPollerSize = 1024; - } else { - actualPollerSize = defaultPollerSize; - } - - timeouts = new SocketTimeouts(defaultPollerSize); + pollerSize = getMaxConnections(); + timeouts = new SocketTimeouts(pollerSize); // At the moment, setting the timeout is useless, but it could get // used again as the normal poller could be faster using maintain. // It might not be worth bothering though. - long pollset = allocatePoller(actualPollerSize, pool, -1); - if (pollset == 0 && actualPollerSize > 1024) { - actualPollerSize = 1024; - pollset = allocatePoller(actualPollerSize, pool, -1); - } - if (pollset == 0) { - actualPollerSize = 62; - pollset = allocatePoller(actualPollerSize, pool, -1); - } - - pollerCount = defaultPollerSize / actualPollerSize; - pollerTime = pollTime / pollerCount; - nextPollerTime = pollerTime; - - pollers = new long[pollerCount]; - pollers[0] = pollset; - for (int i = 1; i < pollerCount; i++) { - pollers[i] = allocatePoller(actualPollerSize, pool, -1); - } - - pollerSpace = new int[pollerCount]; - for (int i = 0; i < pollerCount; i++) { - pollerSpace[i] = actualPollerSize; - } + aprPoller = allocatePoller(pollerSize, pool, -1); /* * x2 - One descriptor for the socket, one for the event(s). @@ -1201,12 +1142,12 @@ public class AprEndpoint extends AbstractEndpoint<Long> implements SNICallBack { * for a maximum of two events (read and write) at any one * time. * - * Therefore size is actual poller size *4. + * Therefore size is poller size *4. */ - desc = new long[actualPollerSize * 4]; + desc = new long[pollerSize * 4]; connectionCount.set(0); - addList = new SocketList(defaultPollerSize); - closeList = new SocketList(defaultPollerSize); + addList = new SocketList(pollerSize); + closeList = new SocketList(pollerSize); } @@ -1228,7 +1169,7 @@ public class AprEndpoint extends AbstractEndpoint<Long> implements SNICallBack { // still in the poller can cause problems try { this.notify(); - this.wait(pollerCount * pollTime / 1000); + this.wait(pollTime / 1000); } catch (InterruptedException e) { // Ignore } @@ -1257,12 +1198,10 @@ public class AprEndpoint extends AbstractEndpoint<Long> implements SNICallBack { } addList.clear(); // Close all sockets still in the poller - for (int i = 0; i < pollerCount; i++) { - int rv = Poll.pollset(pollers[i], desc); - if (rv > 0) { - for (int n = 0; n < rv; n++) { - destroySocket(desc[n*2+1]); - } + int rv = Poll.pollset(aprPoller, desc); + if (rv > 0) { + for (int n = 0; n < rv; n++) { + destroySocket(desc[n*2+1]); } } Pool.destroy(pool); @@ -1303,6 +1242,7 @@ public class AprEndpoint extends AbstractEndpoint<Long> implements SNICallBack { // Add socket to the list. Newly added sockets will wait // at most for pollTime before being polled. if (addList.add(socket, timeout, flags)) { + // In case the poller thread is in the idle wait this.notify(); } } @@ -1314,16 +1254,10 @@ public class AprEndpoint extends AbstractEndpoint<Long> implements SNICallBack { * {@link Poller#run()}. */ private boolean addToPoller(long socket, int events) { - int rv = -1; - for (int i = 0; i < pollers.length; i++) { - if (pollerSpace[i] > 0) { - rv = Poll.add(pollers[i], socket, events); - if (rv == Status.APR_SUCCESS) { - pollerSpace[i]--; - connectionCount.incrementAndGet(); - return true; - } - } + int rv = Poll.add(aprPoller, socket, events); + if (rv == Status.APR_SUCCESS) { + connectionCount.incrementAndGet(); + return true; } return false; } @@ -1336,6 +1270,7 @@ public class AprEndpoint extends AbstractEndpoint<Long> implements SNICallBack { */ private synchronized void close(long socket) { closeList.add(socket, 0, 0); + // In case the poller thread is in the idle wait this.notify(); } @@ -1349,24 +1284,18 @@ public class AprEndpoint extends AbstractEndpoint<Long> implements SNICallBack { log.debug(sm.getString("endpoint.debug.pollerRemove", Long.valueOf(socket))); } - int rv = -1; - for (int i = 0; i < pollers.length; i++) { - if (pollerSpace[i] < actualPollerSize) { - rv = Poll.remove(pollers[i], socket); - if (rv != Status.APR_NOTFOUND) { - pollerSpace[i]++; - connectionCount.decrementAndGet(); - if (log.isDebugEnabled()) { - log.debug(sm.getString("endpoint.debug.pollerRemoved", - Long.valueOf(socket))); - } - break; - } + int rv = Poll.remove(aprPoller, socket); + if (rv != Status.APR_NOTFOUND) { + connectionCount.decrementAndGet(); + if (log.isDebugEnabled()) { + log.debug(sm.getString("endpoint.debug.pollerRemoved", + Long.valueOf(socket))); } } timeouts.remove(socket); } + /** * Timeout checks. Must only be called from {@link Poller#run()}. */ @@ -1400,15 +1329,13 @@ public class AprEndpoint extends AbstractEndpoint<Long> implements SNICallBack { public String toString() { StringBuffer buf = new StringBuffer(); buf.append("Poller"); - long[] res = new long[actualPollerSize * 2]; - for (int i = 0; i < pollers.length; i++) { - int count = Poll.pollset(pollers[i], res); - buf.append(" [ "); - for (int j = 0; j < count; j++) { - buf.append(desc[2*j+1]).append(" "); - } - buf.append("]"); + long[] res = new long[pollerSize * 2]; + int count = Poll.pollset(aprPoller, res); + buf.append(" [ "); + for (int j = 0; j < count; j++) { + buf.append(desc[2*j+1]).append(" "); } + buf.append("]"); return buf.toString(); } @@ -1533,170 +1460,146 @@ public class AprEndpoint extends AbstractEndpoint<Long> implements SNICallBack { } } - // Poll for the specified interval - for (int i = 0; i < pollers.length; i++) { + // Flag to ask to reallocate the pool + boolean reset = false; - // Flag to ask to reallocate the pool - boolean reset = false; - - int rv = 0; - // Reset the nextPollerTime - nextPollerTime = pollerTime; - // Iterate on each pollers, but no need to poll empty pollers - if (pollerSpace[i] < actualPollerSize) { - rv = Poll.poll(pollers[i], nextPollerTime, desc, true); - // Reset the nextPollerTime - nextPollerTime = pollerTime; - } else { - // Skipping an empty poll set means skipping a wait - // time of pollerTime microseconds. If most of the - // poll sets are skipped then this loop will be - // tighter than expected which could lead to higher - // than expected CPU usage. Extending the - // nextPollerTime ensures that this loop always - // takes about the same time to execute. - nextPollerTime += pollerTime; - } - if (rv > 0) { - rv = mergeDescriptors(desc, rv); - pollerSpace[i] += rv; - connectionCount.addAndGet(-rv); - for (int n = 0; n < rv; n++) { - if (getLog().isDebugEnabled()) { - log.debug(sm.getString( - "endpoint.debug.pollerProcess", - Long.valueOf(desc[n*2+1]), - Long.valueOf(desc[n*2]))); - } - long timeout = timeouts.remove(desc[n*2+1]); - AprSocketWrapper wrapper = connections.get( - Long.valueOf(desc[n*2+1])); - if (wrapper == null) { - // Socket was closed in another thread while still in - // the Poller but wasn't removed from the Poller before - // new data arrived. - continue; - } - wrapper.pollerFlags = wrapper.pollerFlags & ~((int) desc[n*2]); - // Check for failed sockets and hand this socket off to a worker - if (((desc[n*2] & Poll.APR_POLLHUP) == Poll.APR_POLLHUP) - || ((desc[n*2] & Poll.APR_POLLERR) == Poll.APR_POLLERR) - || ((desc[n*2] & Poll.APR_POLLNVAL) == Poll.APR_POLLNVAL)) { - // Need to trigger error handling. Poller may return error - // codes plus the flags it was waiting for or it may just - // return an error code. We could handle the error here but - // if we do, there will be no exception associated with the - // error in application code. By signalling read/write is - // possible, a read/write will be attempted, fail and that - // will trigger an exception the application will see. - // Check the return flags first, followed by what the socket - // was registered for - if ((desc[n*2] & Poll.APR_POLLIN) == Poll.APR_POLLIN) { - // Error probably occurred during a non-blocking read - if (!processSocket(desc[n*2+1], SocketEvent.OPEN_READ)) { - // Close socket and clear pool - closeSocket(desc[n*2+1]); - } - } else if ((desc[n*2] & Poll.APR_POLLOUT) == Poll.APR_POLLOUT) { - // Error probably occurred during a non-blocking write - if (!processSocket(desc[n*2+1], SocketEvent.OPEN_WRITE)) { - // Close socket and clear pool - closeSocket(desc[n*2+1]); - } - } else if ((wrapper.pollerFlags & Poll.APR_POLLIN) == Poll.APR_POLLIN) { - // Can't tell what was happening when the error occurred but the - // socket is registered for non-blocking read so use that - if (!processSocket(desc[n*2+1], SocketEvent.OPEN_READ)) { - // Close socket and clear pool - closeSocket(desc[n*2+1]); - } - } else if ((wrapper.pollerFlags & Poll.APR_POLLOUT) == Poll.APR_POLLOUT) { - // Can't tell what was happening when the error occurred but the - // socket is registered for non-blocking write so use that - if (!processSocket(desc[n*2+1], SocketEvent.OPEN_WRITE)) { - // Close socket and clear pool - closeSocket(desc[n*2+1]); - } - } else { + int rv = Poll.poll(aprPoller, pollTime, desc, true); + if (rv > 0) { + rv = mergeDescriptors(desc, rv); + connectionCount.addAndGet(-rv); + for (int n = 0; n < rv; n++) { + if (getLog().isDebugEnabled()) { + log.debug(sm.getString( + "endpoint.debug.pollerProcess", + Long.valueOf(desc[n*2+1]), + Long.valueOf(desc[n*2]))); + } + long timeout = timeouts.remove(desc[n*2+1]); + AprSocketWrapper wrapper = connections.get( + Long.valueOf(desc[n*2+1])); + if (wrapper == null) { + // Socket was closed in another thread while still in + // the Poller but wasn't removed from the Poller before + // new data arrived. + continue; + } + wrapper.pollerFlags = wrapper.pollerFlags & ~((int) desc[n*2]); + // Check for failed sockets and hand this socket off to a worker + if (((desc[n*2] & Poll.APR_POLLHUP) == Poll.APR_POLLHUP) + || ((desc[n*2] & Poll.APR_POLLERR) == Poll.APR_POLLERR) + || ((desc[n*2] & Poll.APR_POLLNVAL) == Poll.APR_POLLNVAL)) { + // Need to trigger error handling. Poller may return error + // codes plus the flags it was waiting for or it may just + // return an error code. We could handle the error here but + // if we do, there will be no exception associated with the + // error in application code. By signalling read/write is + // possible, a read/write will be attempted, fail and that + // will trigger an exception the application will see. + // Check the return flags first, followed by what the socket + // was registered for + if ((desc[n*2] & Poll.APR_POLLIN) == Poll.APR_POLLIN) { + // Error probably occurred during a non-blocking read + if (!processSocket(desc[n*2+1], SocketEvent.OPEN_READ)) { // Close socket and clear pool closeSocket(desc[n*2+1]); } - } else if (((desc[n*2] & Poll.APR_POLLIN) == Poll.APR_POLLIN) - || ((desc[n*2] & Poll.APR_POLLOUT) == Poll.APR_POLLOUT)) { - boolean error = false; - if (((desc[n*2] & Poll.APR_POLLIN) == Poll.APR_POLLIN) && - !processSocket(desc[n*2+1], SocketEvent.OPEN_READ)) { - error = true; + } else if ((desc[n*2] & Poll.APR_POLLOUT) == Poll.APR_POLLOUT) { + // Error probably occurred during a non-blocking write + if (!processSocket(desc[n*2+1], SocketEvent.OPEN_WRITE)) { // Close socket and clear pool closeSocket(desc[n*2+1]); } - if (!error && - ((desc[n*2] & Poll.APR_POLLOUT) == Poll.APR_POLLOUT) && - !processSocket(desc[n*2+1], SocketEvent.OPEN_WRITE)) { + } else if ((wrapper.pollerFlags & Poll.APR_POLLIN) == Poll.APR_POLLIN) { + // Can't tell what was happening when the error occurred but the + // socket is registered for non-blocking read so use that + if (!processSocket(desc[n*2+1], SocketEvent.OPEN_READ)) { // Close socket and clear pool - error = true; closeSocket(desc[n*2+1]); } - if (!error && wrapper.pollerFlags != 0) { - // If socket was registered for multiple events but - // only some of the occurred, re-register for the - // remaining events. - // timeout is the value of System.currentTimeMillis() that - // was set as the point that the socket will timeout. When - // adding to the poller, the timeout from now in - // milliseconds is required. - // So first, subtract the current timestamp - if (timeout > 0) { - timeout = timeout - System.currentTimeMillis(); - } - // If the socket should have already expired by now, - // re-add it with a very short timeout - if (timeout <= 0) { - timeout = 1; - } - // Should be impossible but just in case since timeout will - // be cast to an int. - if (timeout > Integer.MAX_VALUE) { - timeout = Integer.MAX_VALUE; - } - add(desc[n*2+1], (int) timeout, wrapper.pollerFlags); + } else if ((wrapper.pollerFlags & Poll.APR_POLLOUT) == Poll.APR_POLLOUT) { + // Can't tell what was happening when the error occurred but the + // socket is registered for non-blocking write so use that + if (!processSocket(desc[n*2+1], SocketEvent.OPEN_WRITE)) { + // Close socket and clear pool + closeSocket(desc[n*2+1]); } } else { - // Unknown event - getLog().warn(sm.getString( - "endpoint.apr.pollUnknownEvent", - Long.valueOf(desc[n*2]))); // Close socket and clear pool closeSocket(desc[n*2+1]); } - } - } else if (rv < 0) { - int errn = -rv; - // Any non timeup or interrupted error is critical - if ((errn != Status.TIMEUP) && (errn != Status.EINTR)) { - if (errn > Status.APR_OS_START_USERERR) { - errn -= Status.APR_OS_START_USERERR; + } else if (((desc[n*2] & Poll.APR_POLLIN) == Poll.APR_POLLIN) + || ((desc[n*2] & Poll.APR_POLLOUT) == Poll.APR_POLLOUT)) { + boolean error = false; + if (((desc[n*2] & Poll.APR_POLLIN) == Poll.APR_POLLIN) && + !processSocket(desc[n*2+1], SocketEvent.OPEN_READ)) { + error = true; + // Close socket and clear pool + closeSocket(desc[n*2+1]); } - getLog().error(sm.getString( - "endpoint.apr.pollError", - Integer.valueOf(errn), - Error.strerror(errn))); - // Destroy and reallocate the poller - reset = true; + if (!error && + ((desc[n*2] & Poll.APR_POLLOUT) == Poll.APR_POLLOUT) && + !processSocket(desc[n*2+1], SocketEvent.OPEN_WRITE)) { + // Close socket and clear pool + error = true; + closeSocket(desc[n*2+1]); + } + if (!error && wrapper.pollerFlags != 0) { + // If socket was registered for multiple events but + // only some of the occurred, re-register for the + // remaining events. + // timeout is the value of System.currentTimeMillis() that + // was set as the point that the socket will timeout. When + // adding to the poller, the timeout from now in + // milliseconds is required. + // So first, subtract the current timestamp + if (timeout > 0) { + timeout = timeout - System.currentTimeMillis(); + } + // If the socket should have already expired by now, + // re-add it with a very short timeout + if (timeout <= 0) { + timeout = 1; + } + // Should be impossible but just in case since timeout will + // be cast to an int. + if (timeout > Integer.MAX_VALUE) { + timeout = Integer.MAX_VALUE; + } + add(desc[n*2+1], (int) timeout, wrapper.pollerFlags); + } + } else { + // Unknown event + getLog().warn(sm.getString( + "endpoint.apr.pollUnknownEvent", + Long.valueOf(desc[n*2]))); + // Close socket and clear pool + closeSocket(desc[n*2+1]); } } - - if (reset && pollerRunning) { - // Reallocate the current poller - int count = Poll.pollset(pollers[i], desc); - long newPoller = allocatePoller(actualPollerSize, pool, -1); - // Don't restore connections for now, since I have not tested it - pollerSpace[i] = actualPollerSize; - connectionCount.addAndGet(-count); - Poll.destroy(pollers[i]); - pollers[i] = newPoller; + } else if (rv < 0) { + int errn = -rv; + // Any non timeup or interrupted error is critical + if ((errn != Status.TIMEUP) && (errn != Status.EINTR)) { + if (errn > Status.APR_OS_START_USERERR) { + errn -= Status.APR_OS_START_USERERR; + } + getLog().error(sm.getString( + "endpoint.apr.pollError", + Integer.valueOf(errn), + Error.strerror(errn))); + // Destroy and reallocate the poller + reset = true; } + } + if (reset && pollerRunning) { + // Reallocate the current poller + int count = Poll.pollset(aprPoller, desc); + long newPoller = allocatePoller(pollerSize, pool, -1); + // Don't restore connections for now, since I have not tested it + connectionCount.addAndGet(-count); + Poll.destroy(aprPoller); + aprPoller = newPoller; } } catch (Throwable t) { ExceptionUtils.handleThrowable(t); @@ -2146,7 +2049,7 @@ public class AprEndpoint extends AbstractEndpoint<Long> implements SNICallBack { * This class is the equivalent of the Worker, but will simply use in an * external Executor thread pool. */ - protected class SocketProcessor extends SocketProcessorBase<Long> { + protected class SocketProcessor extends SocketProcessorBase<Long> { public SocketProcessor(SocketWrapperBase<Long> socketWrapper, SocketEvent event) { super(socketWrapper, event); diff --git a/webapps/docs/changelog.xml b/webapps/docs/changelog.xml index c1e9292..bf86274 100644 --- a/webapps/docs/changelog.xml +++ b/webapps/docs/changelog.xml @@ -45,6 +45,15 @@ issues do not "pop up" wrt. others). --> <section name="Tomcat 8.5.50 (markt)" rtext="in development"> + <subsection name="Coyote"> + <changelog> + <scode> + Refactor the APR poller to always use a single pollset now that the + Windows operating systems that required multiple smaller pollsets to be + used are no longer supported. (markt) + </scode> + </changelog> + </subsection> </section> <section name="Tomcat 8.5.49 (markt)" rtext="release in progress"> <subsection name="Catalina"> --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org