This is an automated email from the ASF dual-hosted git repository. markt pushed a commit to branch 7.0.x in repository https://gitbox.apache.org/repos/asf/tomcat.git
View the commit online: https://github.com/apache/tomcat/commit/7e853199d4ce0db4cc173f80b39a616890b64dee commit 7e853199d4ce0db4cc173f80b39a616890b64dee Author: Mark Thomas <ma...@apache.org> AuthorDate: Tue Nov 19 17:03:28 2019 +0000 Refactor APR Poller to remove use of multiple pollsets --- java/org/apache/tomcat/util/net/AprEndpoint.java | 465 +++++++++-------------- webapps/docs/changelog.xml | 5 + 2 files changed, 187 insertions(+), 283 deletions(-) diff --git a/java/org/apache/tomcat/util/net/AprEndpoint.java b/java/org/apache/tomcat/util/net/AprEndpoint.java index a31430d..dfca17b 100644 --- a/java/org/apache/tomcat/util/net/AprEndpoint.java +++ b/java/org/apache/tomcat/util/net/AprEndpoint.java @@ -1409,39 +1409,17 @@ public class AprEndpoint extends AbstractEndpoint<Long> { // ------------------------------------------------------ Poller Inner Class - public class Poller implements Runnable { + public class Poller implements Runnable { /** - * Pointers to the pollers. + * Pointer to the poller. */ - protected long[] pollers = null; + private long aprPoller; /** * Actual poller size. */ - protected int actualPollerSize = 0; - - /** - * Amount of spots left in the poller. - */ - protected int[] pollerSpace = null; - - /** - * Amount of low level pollers in use by this poller. - */ - protected int pollerCount; - - /** - * Timeout value for the poll call. - */ - protected int pollerTime; - - /** - * Variable poller timeout that adjusts depending on how many poll sets - * are in use so that the total poll time across all poll sets remains - * equal to pollTime. - */ - private int nextPollerTime; + private int pollerSize = 0; /** * Root pool. @@ -1495,60 +1473,23 @@ public class AprEndpoint extends AbstractEndpoint<Long> { private volatile boolean pollerRunning = true; /** - * Create the poller. With some versions of APR, the maximum poller size - * will be 62 (recompiling APR is necessary to remove this limitation). + * Create the poller. */ protected void init() { pool = Pool.create(serverSockPool); - - // Single poller by default - int defaultPollerSize = getMaxConnections(); - - if ((OS.IS_WIN32 || OS.IS_WIN64) && (defaultPollerSize > 1024)) { - // The maximum per poller to get reasonable performance is 1024 - // Adjust poller size so that it won't reach the limit. This is - // a limitation of XP / Server 2003 that has been fixed in - // Vista / Server 2008 onwards. - actualPollerSize = 1024; - } else { - actualPollerSize = defaultPollerSize; - } - - timeouts = new SocketTimeouts(defaultPollerSize); + pollerSize = getMaxConnections(); + timeouts = new SocketTimeouts(pollerSize); // At the moment, setting the timeout is useless, but it could get // used again as the normal poller could be faster using maintain. // It might not be worth bothering though. - long pollset = allocatePoller(actualPollerSize, pool, -1); - if (pollset == 0 && actualPollerSize > 1024) { - actualPollerSize = 1024; - pollset = allocatePoller(actualPollerSize, pool, -1); - } - if (pollset == 0) { - actualPollerSize = 62; - pollset = allocatePoller(actualPollerSize, pool, -1); - } - - pollerCount = defaultPollerSize / actualPollerSize; - pollerTime = pollTime / pollerCount; - nextPollerTime = pollerTime; - - pollers = new long[pollerCount]; - pollers[0] = pollset; - for (int i = 1; i < pollerCount; i++) { - pollers[i] = allocatePoller(actualPollerSize, pool, -1); - } - - pollerSpace = new int[pollerCount]; - for (int i = 0; i < pollerCount; i++) { - pollerSpace[i] = actualPollerSize; - } + aprPoller = allocatePoller(pollerSize, pool, -1); - desc = new long[actualPollerSize * 2]; + desc = new long[pollerSize * 2]; connectionCount.set(0); - addList = new SocketList(defaultPollerSize); - closeList = new SocketList(defaultPollerSize); + addList = new SocketList(pollerSize); + closeList = new SocketList(pollerSize); } @@ -1590,15 +1531,13 @@ public class AprEndpoint extends AbstractEndpoint<Long> { } addList.clear(); // Close all sockets still in the poller - for (int i = 0; i < pollerCount; i++) { - int rv = Poll.pollset(pollers[i], desc); - if (rv > 0) { - for (int n = 0; n < rv; n++) { - boolean comet = connections.get( - Long.valueOf(desc[n*2+1])).isComet(); - if (!comet || !processSocket(desc[n*2+1], SocketStatus.STOP)) { - destroySocket(desc[n*2+1]); - } + int rv = Poll.pollset(aprPoller, desc); + if (rv > 0) { + for (int n = 0; n < rv; n++) { + boolean comet = connections.get( + Long.valueOf(desc[n*2+1])).isComet(); + if (!comet || !processSocket(desc[n*2+1], SocketStatus.STOP)) { + destroySocket(desc[n*2+1]); } } } @@ -1669,16 +1608,10 @@ public class AprEndpoint extends AbstractEndpoint<Long> { * {@link Poller#run()}. */ protected boolean addToPoller(long socket, int events) { - int rv = -1; - for (int i = 0; i < pollers.length; i++) { - if (pollerSpace[i] > 0) { - rv = Poll.add(pollers[i], socket, events); - if (rv == Status.APR_SUCCESS) { - pollerSpace[i]--; - connectionCount.incrementAndGet(); - return true; - } - } + int rv = Poll.add(aprPoller, socket, events); + if (rv == Status.APR_SUCCESS) { + connectionCount.incrementAndGet(); + return true; } return false; } @@ -1708,19 +1641,12 @@ public class AprEndpoint extends AbstractEndpoint<Long> { log.debug(sm.getString("endpoint.debug.pollerRemove", Long.valueOf(socket))); } - int rv = -1; - for (int i = 0; i < pollers.length; i++) { - if (pollerSpace[i] < actualPollerSize) { - rv = Poll.remove(pollers[i], socket); - if (rv != Status.APR_NOTFOUND) { - pollerSpace[i]++; - connectionCount.decrementAndGet(); - if (log.isDebugEnabled()) { - log.debug(sm.getString("endpoint.debug.pollerRemoved", - Long.valueOf(socket))); - } - break; - } + int rv = Poll.remove(aprPoller, socket); + if (rv != Status.APR_NOTFOUND) { + connectionCount.decrementAndGet(); + if (log.isDebugEnabled()) { + log.debug(sm.getString("endpoint.debug.pollerRemoved", + Long.valueOf(socket))); } } timeouts.remove(socket); @@ -1765,15 +1691,13 @@ public class AprEndpoint extends AbstractEndpoint<Long> { public String toString() { StringBuffer buf = new StringBuffer(); buf.append("Poller"); - long[] res = new long[actualPollerSize * 2]; - for (int i = 0; i < pollers.length; i++) { - int count = Poll.pollset(pollers[i], res); - buf.append(" [ "); - for (int j = 0; j < count; j++) { - buf.append(desc[2*j+1]).append(" "); - } - buf.append("]"); + long[] res = new long[pollerSize * 2]; + int count = Poll.pollset(aprPoller, res); + buf.append(" [ "); + for (int j = 0; j < count; j++) { + buf.append(desc[2*j+1]).append(" "); } + buf.append("]"); return buf.toString(); } @@ -1909,126 +1833,100 @@ public class AprEndpoint extends AbstractEndpoint<Long> { } } - // Poll for the specified interval - for (int i = 0; i < pollers.length; i++) { - - // Flags to ask to reallocate the pool - boolean reset = false; - //ArrayList<Long> skip = null; - - int rv = 0; - // Reset the nextPollerTime - nextPollerTime = pollerTime; - // Iterate on each pollers, but no need to poll empty pollers - if (pollerSpace[i] < actualPollerSize) { - rv = Poll.poll(pollers[i], nextPollerTime, desc, true); - // Reset the nextPollerTime - nextPollerTime = pollerTime; - } else { - // Skipping an empty poll set means skipping a wait - // time of pollerTime microseconds. If most of the - // poll sets are skipped then this loop will be - // tighter than expected which could lead to higher - // than expected CPU usage. Extending the - // nextPollerTime ensures that this loop always - // takes about the same time to execute. - nextPollerTime += pollerTime; - } - if (rv > 0) { - pollerSpace[i] += rv; - connectionCount.addAndGet(-rv); - for (int n = 0; n < rv; n++) { - if (getLog().isDebugEnabled()) { - log.debug(sm.getString( - "endpoint.debug.pollerProcess", - Long.valueOf(desc[n*2+1]), + // Flag to ask to reallocate the pool + boolean reset = false; + + int rv = Poll.poll(aprPoller, pollTime, desc, true); + if (rv > 0) { + connectionCount.addAndGet(-rv); + for (int n = 0; n < rv; n++) { + if (getLog().isDebugEnabled()) { + log.debug(sm.getString( + "endpoint.debug.pollerProcess", + Long.valueOf(desc[n*2+1]), + Long.valueOf(desc[n*2]))); + } + long timeout = timeouts.remove(desc[n*2+1]); + AprSocketWrapper wrapper = connections.get( + Long.valueOf(desc[n*2+1])); + if (wrapper == null) { + // Socket was closed in another thread while still in + // the Poller but wasn't removed from the Poller before + // new data arrived. + continue; + } + wrapper.pollerFlags = wrapper.pollerFlags & ~((int) desc[n*2]); + // Check for failed sockets and hand this socket off to a worker + if (wrapper.isComet()) { + // Event processes either a read or a write depending on what the poller returns + if (((desc[n*2] & Poll.APR_POLLHUP) == Poll.APR_POLLHUP) + || ((desc[n*2] & Poll.APR_POLLERR) == Poll.APR_POLLERR) + || ((desc[n*2] & Poll.APR_POLLNVAL) == Poll.APR_POLLNVAL)) { + if (!processSocket(desc[n*2+1], SocketStatus.ERROR)) { + // Close socket and clear pool + closeSocket(desc[n*2+1]); + } + } else if ((desc[n*2] & Poll.APR_POLLIN) == Poll.APR_POLLIN) { + if (wrapper.pollerFlags != 0) { + add(desc[n*2+1], 1, wrapper.pollerFlags); + } + if (!processSocket(desc[n*2+1], SocketStatus.OPEN_READ)) { + // Close socket and clear pool + closeSocket(desc[n*2+1]); + } + } else if ((desc[n*2] & Poll.APR_POLLOUT) == Poll.APR_POLLOUT) { + if (wrapper.pollerFlags != 0) { + add(desc[n*2+1], 1, wrapper.pollerFlags); + } + if (!processSocket(desc[n*2+1], SocketStatus.OPEN_WRITE)) { + // Close socket and clear pool + closeSocket(desc[n*2+1]); + } + } else { + // Unknown event + getLog().warn(sm.getString( + "endpoint.apr.pollUnknownEvent", Long.valueOf(desc[n*2]))); + if (!processSocket(desc[n*2+1], SocketStatus.ERROR)) { + // Close socket and clear pool + closeSocket(desc[n*2+1]); + } } - long timeout = timeouts.remove(desc[n*2+1]); - AprSocketWrapper wrapper = connections.get( - Long.valueOf(desc[n*2+1])); - if (wrapper == null) { - // Socket was closed in another thread while still in - // the Poller but wasn't removed from the Poller before - // new data arrived. - continue; - } - wrapper.pollerFlags = wrapper.pollerFlags & ~((int) desc[n*2]); - // Check for failed sockets and hand this socket off to a worker - if (wrapper.isComet()) { - // Event processes either a read or a write depending on what the poller returns - if (((desc[n*2] & Poll.APR_POLLHUP) == Poll.APR_POLLHUP) - || ((desc[n*2] & Poll.APR_POLLERR) == Poll.APR_POLLERR) - || ((desc[n*2] & Poll.APR_POLLNVAL) == Poll.APR_POLLNVAL)) { - if (!processSocket(desc[n*2+1], SocketStatus.ERROR)) { - // Close socket and clear pool - closeSocket(desc[n*2+1]); - } - } else if ((desc[n*2] & Poll.APR_POLLIN) == Poll.APR_POLLIN) { - if (wrapper.pollerFlags != 0) { - add(desc[n*2+1], 1, wrapper.pollerFlags); - } + } else if (((desc[n*2] & Poll.APR_POLLHUP) == Poll.APR_POLLHUP) + || ((desc[n*2] & Poll.APR_POLLERR) == Poll.APR_POLLERR) + || ((desc[n*2] & Poll.APR_POLLNVAL) == Poll.APR_POLLNVAL)) { + if (wrapper.isUpgraded()) { + // Using non-blocking IO. Need to trigger error handling. + // Poller may return error codes plus the flags it was + // waiting for or it may just return an error code. By + // signalling read/write is possible, a read/write will be + // attempted, fail and that will trigger an exception the + // application will see. + // Check the return flags first, followed by what the socket + // was registered for + if ((desc[n*2] & Poll.APR_POLLIN) == Poll.APR_POLLIN) { + // Error probably occurred during a non-blocking read if (!processSocket(desc[n*2+1], SocketStatus.OPEN_READ)) { // Close socket and clear pool closeSocket(desc[n*2+1]); } } else if ((desc[n*2] & Poll.APR_POLLOUT) == Poll.APR_POLLOUT) { - if (wrapper.pollerFlags != 0) { - add(desc[n*2+1], 1, wrapper.pollerFlags); - } + // Error probably occurred during a non-blocking write if (!processSocket(desc[n*2+1], SocketStatus.OPEN_WRITE)) { // Close socket and clear pool closeSocket(desc[n*2+1]); } - } else { - // Unknown event - getLog().warn(sm.getString( - "endpoint.apr.pollUnknownEvent", - Long.valueOf(desc[n*2]))); - if (!processSocket(desc[n*2+1], SocketStatus.ERROR)) { + } else if ((wrapper.pollerFlags & Poll.APR_POLLIN) == Poll.APR_POLLIN) { + // Can't tell what was happening when the error occurred but the + // socket is registered for non-blocking read so use that + if (!processSocket(desc[n*2+1], SocketStatus.OPEN_READ)) { // Close socket and clear pool closeSocket(desc[n*2+1]); } - } - } else if (((desc[n*2] & Poll.APR_POLLHUP) == Poll.APR_POLLHUP) - || ((desc[n*2] & Poll.APR_POLLERR) == Poll.APR_POLLERR) - || ((desc[n*2] & Poll.APR_POLLNVAL) == Poll.APR_POLLNVAL)) { - if (wrapper.isUpgraded()) { - // Using non-blocking IO. Need to trigger error handling. - // Poller may return error codes plus the flags it was - // waiting for or it may just return an error code. By - // signalling read/write is possible, a read/write will be - // attempted, fail and that will trigger an exception the - // application will see. - // Check the return flags first, followed by what the socket - // was registered for - if ((desc[n*2] & Poll.APR_POLLIN) == Poll.APR_POLLIN) { - // Error probably occurred during a non-blocking read - if (!processSocket(desc[n*2+1], SocketStatus.OPEN_READ)) { - // Close socket and clear pool - closeSocket(desc[n*2+1]); - } - } else if ((desc[n*2] & Poll.APR_POLLOUT) == Poll.APR_POLLOUT) { - // Error probably occurred during a non-blocking write - if (!processSocket(desc[n*2+1], SocketStatus.OPEN_WRITE)) { - // Close socket and clear pool - closeSocket(desc[n*2+1]); - } - } else if ((wrapper.pollerFlags & Poll.APR_POLLIN) == Poll.APR_POLLIN) { - // Can't tell what was happening when the error occurred but the - // socket is registered for non-blocking read so use that - if (!processSocket(desc[n*2+1], SocketStatus.OPEN_READ)) { - // Close socket and clear pool - closeSocket(desc[n*2+1]); - } - } else if ((wrapper.pollerFlags & Poll.APR_POLLOUT) == Poll.APR_POLLOUT) { - // Can't tell what was happening when the error occurred but the - // socket is registered for non-blocking write so use that - if (!processSocket(desc[n*2+1], SocketStatus.OPEN_WRITE)) { - // Close socket and clear pool - closeSocket(desc[n*2+1]); - } - } else { + } else if ((wrapper.pollerFlags & Poll.APR_POLLOUT) == Poll.APR_POLLOUT) { + // Can't tell what was happening when the error occurred but the + // socket is registered for non-blocking write so use that + if (!processSocket(desc[n*2+1], SocketStatus.OPEN_WRITE)) { // Close socket and clear pool closeSocket(desc[n*2+1]); } @@ -2036,82 +1934,83 @@ public class AprEndpoint extends AbstractEndpoint<Long> { // Close socket and clear pool closeSocket(desc[n*2+1]); } - } else if (((desc[n*2] & Poll.APR_POLLIN) == Poll.APR_POLLIN) - || ((desc[n*2] & Poll.APR_POLLOUT) == Poll.APR_POLLOUT)) { - boolean error = false; - if (((desc[n*2] & Poll.APR_POLLIN) == Poll.APR_POLLIN) && - !processSocket(desc[n*2+1], SocketStatus.OPEN_READ)) { - error = true; - // Close socket and clear pool - closeSocket(desc[n*2+1]); - } - if (!error && - ((desc[n*2] & Poll.APR_POLLOUT) == Poll.APR_POLLOUT) && - !processSocket(desc[n*2+1], SocketStatus.OPEN_WRITE)) { - // Close socket and clear pool - error = true; - closeSocket(desc[n*2+1]); - } - if (!error && wrapper.pollerFlags != 0) { - // If socket was registered for multiple events but - // only some of the occurred, re-register for the - // remaining events. - // timeout is the value of System.currentTimeMillis() that - // was set as the point that the socket will timeout. When - // adding to the poller, the timeout from now in - // milliseconds is required. - // So first, subtract the current timestamp - if (timeout > 0) { - timeout = timeout - System.currentTimeMillis(); - } - // If the socket should have already expired by now, - // re-add it with a very short timeout - if (timeout <= 0) { - timeout = 1; - } - // Should be impossible but just in case since timeout will - // be cast to an int. - if (timeout > Integer.MAX_VALUE) { - timeout = Integer.MAX_VALUE; - } - add(desc[n*2+1], (int) timeout, wrapper.pollerFlags); - } } else { - // Unknown event - getLog().warn(sm.getString( - "endpoint.apr.pollUnknownEvent", - Long.valueOf(desc[n*2]))); // Close socket and clear pool closeSocket(desc[n*2+1]); } - } - } else if (rv < 0) { - int errn = -rv; - // Any non timeup or interrupted error is critical - if ((errn != Status.TIMEUP) && (errn != Status.EINTR)) { - if (errn > Status.APR_OS_START_USERERR) { - errn -= Status.APR_OS_START_USERERR; + } else if (((desc[n*2] & Poll.APR_POLLIN) == Poll.APR_POLLIN) + || ((desc[n*2] & Poll.APR_POLLOUT) == Poll.APR_POLLOUT)) { + boolean error = false; + if (((desc[n*2] & Poll.APR_POLLIN) == Poll.APR_POLLIN) && + !processSocket(desc[n*2+1], SocketStatus.OPEN_READ)) { + error = true; + // Close socket and clear pool + closeSocket(desc[n*2+1]); + } + if (!error && + ((desc[n*2] & Poll.APR_POLLOUT) == Poll.APR_POLLOUT) && + !processSocket(desc[n*2+1], SocketStatus.OPEN_WRITE)) { + // Close socket and clear pool + error = true; + closeSocket(desc[n*2+1]); + } + if (!error && wrapper.pollerFlags != 0) { + // If socket was registered for multiple events but + // only some of the occurred, re-register for the + // remaining events. + // timeout is the value of System.currentTimeMillis() that + // was set as the point that the socket will timeout. When + // adding to the poller, the timeout from now in + // milliseconds is required. + // So first, subtract the current timestamp + if (timeout > 0) { + timeout = timeout - System.currentTimeMillis(); + } + // If the socket should have already expired by now, + // re-add it with a very short timeout + if (timeout <= 0) { + timeout = 1; + } + // Should be impossible but just in case since timeout will + // be cast to an int. + if (timeout > Integer.MAX_VALUE) { + timeout = Integer.MAX_VALUE; + } + add(desc[n*2+1], (int) timeout, wrapper.pollerFlags); } - getLog().error(sm.getString( - "endpoint.apr.pollError", - Integer.valueOf(errn), - Error.strerror(errn))); - // Destroy and reallocate the poller - reset = true; + } else { + // Unknown event + getLog().warn(sm.getString( + "endpoint.apr.pollUnknownEvent", + Long.valueOf(desc[n*2]))); + // Close socket and clear pool + closeSocket(desc[n*2+1]); } } - - if (reset && pollerRunning) { - // Reallocate the current poller - int count = Poll.pollset(pollers[i], desc); - long newPoller = allocatePoller(actualPollerSize, pool, -1); - // Don't restore connections for now, since I have not tested it - pollerSpace[i] = actualPollerSize; - connectionCount.addAndGet(-count); - Poll.destroy(pollers[i]); - pollers[i] = newPoller; + } else if (rv < 0) { + int errn = -rv; + // Any non timeup or interrupted error is critical + if ((errn != Status.TIMEUP) && (errn != Status.EINTR)) { + if (errn > Status.APR_OS_START_USERERR) { + errn -= Status.APR_OS_START_USERERR; + } + getLog().error(sm.getString( + "endpoint.apr.pollError", + Integer.valueOf(errn), + Error.strerror(errn))); + // Destroy and reallocate the poller + reset = true; } + } + if (reset && pollerRunning) { + // Reallocate the current poller + int count = Poll.pollset(aprPoller, desc); + long newPoller = allocatePoller(pollerSize, pool, -1); + // Don't restore connections for now, since I have not tested it + connectionCount.addAndGet(-count); + Poll.destroy(aprPoller); + aprPoller = newPoller; } } catch (Throwable t) { ExceptionUtils.handleThrowable(t); diff --git a/webapps/docs/changelog.xml b/webapps/docs/changelog.xml index 5b5ee4f..75ef806 100644 --- a/webapps/docs/changelog.xml +++ b/webapps/docs/changelog.xml @@ -117,6 +117,11 @@ request header to use the shared parsing code and reduce duplication. (markt) </fix> + <scode> + Refactor the APR poller to always use a single pollset now that the + Windows operating systems that required multiple smaller pollsets to be + used are no longer supported. (markt) + </scode> </changelog> </subsection> <subsection name="Jasper"> --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org