Author: markt Date: Thu May 26 15:27:52 2011 New Revision: 1127962 URL: http://svn.apache.org/viewvc?rev=1127962&view=rev Log: Fix https://issues.apache.org/bugzilla/show_bug.cgi?id=51240 Replace the more generic CounterLatch (that has concurrency issues) with a more specific LimitLatch that (mostly) only provides the functionality required by the connectors to implement maxConnections. It also adds support for dynamically modifying maxConnections.
Added: tomcat/trunk/java/org/apache/tomcat/util/threads/LimitLatch.java - copied, changed from r1125824, tomcat/trunk/java/org/apache/tomcat/util/threads/CounterLatch.java tomcat/trunk/test/org/apache/tomcat/util/threads/TestLimitLatch.java - copied, changed from r1125824, tomcat/trunk/test/org/apache/tomcat/util/threads/TestCounterLatch.java Removed: tomcat/trunk/java/org/apache/tomcat/util/threads/CounterLatch.java tomcat/trunk/test/org/apache/tomcat/util/threads/TestCounterLatch.java Modified: tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java tomcat/trunk/java/org/apache/tomcat/util/net/JIoEndpoint.java tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java tomcat/trunk/webapps/docs/changelog.xml Modified: tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java?rev=1127962&r1=1127961&r2=1127962&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java (original) +++ tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java Thu May 26 15:27:52 2011 @@ -31,7 +31,7 @@ import javax.net.ssl.KeyManagerFactory; import org.apache.juli.logging.Log; import org.apache.tomcat.util.IntrospectionUtils; import org.apache.tomcat.util.res.StringManager; -import org.apache.tomcat.util.threads.CounterLatch; +import org.apache.tomcat.util.threads.LimitLatch; import org.apache.tomcat.util.threads.ResizableExecutor; import org.apache.tomcat.util.threads.TaskQueue; import org.apache.tomcat.util.threads.TaskThreadFactory; @@ -97,7 +97,7 @@ public abstract class AbstractEndpoint { /** * counter for nr of connections handled by an endpoint */ - private volatile CounterLatch connectionCounterLatch = null; + private volatile LimitLatch connectionLimitLatch = null; /** * Socket properties @@ -111,7 +111,13 @@ public abstract class AbstractEndpoint { // ----------------------------------------------------------------- Properties private int maxConnections = 10000; - public void setMaxConnections(int maxCon) { this.maxConnections = maxCon; } + public void setMaxConnections(int maxCon) { + this.maxConnections = maxCon; + LimitLatch latch = this.connectionLimitLatch; + // Update the latch that enforces this + latch.setLimit(maxCon); + } + public int getMaxConnections() { return this.maxConnections; } /** * External Executor based thread pool. @@ -550,32 +556,26 @@ public abstract class AbstractEndpoint { protected abstract Log getLog(); public abstract boolean getUseSendfile(); - protected CounterLatch initializeConnectionLatch() { - if (connectionCounterLatch==null) { - connectionCounterLatch = new CounterLatch(0,getMaxConnections()); + protected LimitLatch initializeConnectionLatch() { + if (connectionLimitLatch==null) { + connectionLimitLatch = new LimitLatch(getMaxConnections()); } - return connectionCounterLatch; + return connectionLimitLatch; } protected void releaseConnectionLatch() { - CounterLatch latch = connectionCounterLatch; + LimitLatch latch = connectionLimitLatch; if (latch!=null) latch.releaseAll(); - connectionCounterLatch = null; - } - - protected void awaitConnection() throws InterruptedException { - CounterLatch latch = connectionCounterLatch; - if (latch!=null) latch.await(); + connectionLimitLatch = null; } - protected long countUpConnection() { - CounterLatch latch = connectionCounterLatch; - if (latch!=null) return latch.countUp(); - else return -1; + protected void countUpOrAwaitConnection() throws InterruptedException { + LimitLatch latch = connectionLimitLatch; + if (latch!=null) latch.countUpOrAwait(); } protected long countDownConnection() { - CounterLatch latch = connectionCounterLatch; + LimitLatch latch = connectionLimitLatch; if (latch!=null) { long result = latch.countDown(); if (result<0) { Modified: tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java?rev=1127962&r1=1127961&r2=1127962&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java (original) +++ tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java Thu May 26 15:27:52 2011 @@ -949,7 +949,7 @@ public class AprEndpoint extends Abstrac } try { //if we have reached max connections, wait - awaitConnection(); + countUpOrAwaitConnection(); long socket = 0; try { @@ -965,8 +965,6 @@ public class AprEndpoint extends Abstrac // Successful accept, reset the error delay errorDelay = 0; - //increment socket count - countUpConnection(); /* * In the case of a deferred accept unlockAccept needs to * send data. This data will be rubbish, so destroy the Modified: tomcat/trunk/java/org/apache/tomcat/util/net/JIoEndpoint.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/JIoEndpoint.java?rev=1127962&r1=1127961&r2=1127962&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/net/JIoEndpoint.java (original) +++ tomcat/trunk/java/org/apache/tomcat/util/net/JIoEndpoint.java Thu May 26 15:27:52 2011 @@ -211,8 +211,8 @@ public class JIoEndpoint extends Abstrac } try { //if we have reached max connections, wait - awaitConnection(); - + countUpOrAwaitConnection(); + Socket socket = null; try { // Accept the next incoming connection from the server @@ -237,8 +237,6 @@ public class JIoEndpoint extends Abstrac } catch (IOException e) { // Ignore } - } else { - countUpConnection(); } } else { // Close socket right away Modified: tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java?rev=1127962&r1=1127961&r2=1127962&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java (original) +++ tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Thu May 26 15:27:52 2011 @@ -761,7 +761,7 @@ public class NioEndpoint extends Abstrac } try { //if we have reached max connections, wait - awaitConnection(); + countUpOrAwaitConnection(); SocketChannel socket = null; try { @@ -791,8 +791,6 @@ public class NioEndpoint extends Abstrac if (log.isDebugEnabled()) log.debug("", ix); } - } else { - countUpConnection(); } } } catch (SocketTimeoutException sx) { Copied: tomcat/trunk/java/org/apache/tomcat/util/threads/LimitLatch.java (from r1125824, tomcat/trunk/java/org/apache/tomcat/util/threads/CounterLatch.java) URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/threads/LimitLatch.java?p2=tomcat/trunk/java/org/apache/tomcat/util/threads/LimitLatch.java&p1=tomcat/trunk/java/org/apache/tomcat/util/threads/CounterLatch.java&r1=1125824&r2=1127962&rev=1127962&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/threads/CounterLatch.java (original) +++ tomcat/trunk/java/org/apache/tomcat/util/threads/LimitLatch.java Thu May 26 15:27:52 2011 @@ -17,22 +17,15 @@ package org.apache.tomcat.util.threads; import java.util.Collection; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.AbstractQueuedSynchronizer; + /** - * Simple counter latch that allows code to keep an up and down counter, and waits while the latch holds a certain wait value. - * and threads using the latch to wait if the count has reached a certain value. - * The counter latch can be used to keep track of an atomic counter, since the operations {@link #countDown()} - * and {@link #countUp()} are atomic. - * When the latch reaches the wait value, threads will block. The counter latch can hence act like a - * count down latch or a count up latch, while letting you keep track of the counter as well. - * This counter latch works opposite as the java.util.concurrent.CountDownLatch, since the CounterLatch only blocks on a single value and releases the threads on all other values. - * @author fhanik - * @see <a href="http://download.oracle.com/javase/6/docs/api/java/util/concurrent/CountDownLatch.html">CountDownLatch</a> - * + * Shared latch that allows the latch to be acquired a limited number of times + * after which all subsequent requests to acquire the latch will be placed in a + * FIFO queue until one of the shares is returned. */ -public class CounterLatch { +public class LimitLatch { private class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 1L; @@ -41,139 +34,111 @@ public class CounterLatch { } @Override - protected int tryAcquireShared(int arg) { - return ((!released) && count.get() == signal) ? -1 : 1; + protected int tryAcquireShared(int ignored) { + long newCount = count.incrementAndGet(); + if (!released && newCount > limit) { + // Limit exceeded + count.decrementAndGet(); + return -1; + } else { + return 1; + } } @Override protected boolean tryReleaseShared(int arg) { + count.decrementAndGet(); return true; } } private final Sync sync; private final AtomicLong count; - private volatile long signal; + private volatile long limit; private volatile boolean released = false; /** - * Instantiates a CounterLatch object with an initial value and a wait value. - * @param initial - initial value of the counter - * @param waitValue - when the counter holds this value, - * threads calling {@link #await()} or {@link #await(long, TimeUnit)} - * will wait until the counter changes value or until they are interrupted. - */ - public CounterLatch(long initial, long waitValue) { - this.signal = waitValue; - this.count = new AtomicLong(initial); + * Instantiates a LimitLatch object with an initial limit. + * @param limit - maximum number of concurrent acquisitions of this latch + */ + public LimitLatch(long limit) { + this.limit = limit; + this.count = new AtomicLong(0); this.sync = new Sync(); } /** - * Causes the calling thread to wait if the counter holds the waitValue. - * If the counter holds any other value, the thread will return - * If the thread is interrupted or becomes interrupted an InterruptedException is thrown - * @throws InterruptedException + * Obtain the current limit. */ - public void await() throws InterruptedException { - sync.acquireSharedInterruptibly(1); + public long getLimit() { + return limit; } + /** - * Causes the calling thread to wait if the counter holds the waitValue. - * If the counter holds any other value, the thread will return - * If the thread is interrupted or becomes interrupted an InterruptedException is thrown - * @return true if the value changed, false if the timeout has elapsed - * @throws InterruptedException + * Sets a new limit. If the limit is decreased there may be a period where + * more shares of the latch are acquired than the limit. In this case no + * more shares of the latch will be issued until sufficient shares have been + * returned to reduce the number of acquired shares of the latch to below + * the new limit. If the limit is increased, threads currently in the queue + * may not be issued one of the newly available shares until the next + * request is made for a latch. + * + * @param limit The new limit */ - public boolean await(long timeout, TimeUnit unit) throws InterruptedException { - return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); + public void setLimit(long limit) { + this.limit = limit; } + /** - * Increments the counter - * @return the previous counter value + * Acquires a shared latch if one is available or waits for one if no shared + * latch is current available. */ - public long countUp() { - long previous = count.getAndIncrement(); - if (previous == signal) { - sync.releaseShared(0); - } - return previous; + public void countUpOrAwait() throws InterruptedException { + sync.acquireSharedInterruptibly(1); } /** - * Decrements the counter + * Releases a shared latch, making it available for another thread to use. * @return the previous counter value */ public long countDown() { - long previous = count.getAndDecrement(); - if (previous == signal) { - sync.releaseShared(0); - } - return previous; + sync.releaseShared(0); + return count.get(); } /** - * Returns the current counter value - * @return the current counter value + * Releases all waiting threads and causes the {@link #limit} to be ignored + * until {@link #reset()} is called. */ - public long getCount() { - return count.get(); + public boolean releaseAll() { + released = true; + return sync.releaseShared(0); } /** - * Performs an atomic update of the counter - * If the operation is successful and {@code expect==waitValue && expect!=update} waiting threads will be released. - * @param expect - the expected counter value - * @param update - the new counter value - * @return <code>true</code> if successful, <code>false</code> if the - * current value wasn't as expected - */ - public boolean compareAndSet(long expect, long update) { - boolean result = count.compareAndSet(expect, update); - if (result && expect==signal && expect != update) { - sync.releaseShared(0); - } - return result; + * Resets the latch and initializes the shared acquisition counter to zero. + * @see #releaseAll() + */ + public void reset() { + this.count.set(0); + released = false; } /** - * returns true if there are threads blocked by this latch - * @return true if there are threads blocked by this latch + * Returns <code>true</code> if there is at least one thread waiting to + * acquire the shared lock, otherwise returns <code>false</code>. */ public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); } - + /** - * Returns a collection of the blocked threads - * @return a collection of the blocked threads + * Provide access to the list of threads waiting to acquire this limited + * shared latch. */ public Collection<Thread> getQueuedThreads() { return sync.getQueuedThreads(); } - - /** - * releases all waiting threads. This operation is permanent, and no threads will block, - * even if the counter hits the {@code waitValue} until {@link #reset(long)} has been called. - * @return <code>true</code> if this release of shared mode may permit a - * waiting acquire (shared or exclusive) to succeed; and - * <code>false</code> otherwise - */ - public boolean releaseAll() { - released = true; - return sync.releaseShared(0); - } - - /** - * Resets the latch and initializes the counter with the new value. - * @param value the new counter value - * @see #releaseAll() - */ - public void reset(long value) { - this.count.set(value); - released = false; - } - } Copied: tomcat/trunk/test/org/apache/tomcat/util/threads/TestLimitLatch.java (from r1125824, tomcat/trunk/test/org/apache/tomcat/util/threads/TestCounterLatch.java) URL: http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/tomcat/util/threads/TestLimitLatch.java?p2=tomcat/trunk/test/org/apache/tomcat/util/threads/TestLimitLatch.java&p1=tomcat/trunk/test/org/apache/tomcat/util/threads/TestCounterLatch.java&r1=1125824&r2=1127962&rev=1127962&view=diff ============================================================================== --- tomcat/trunk/test/org/apache/tomcat/util/threads/TestCounterLatch.java (original) +++ tomcat/trunk/test/org/apache/tomcat/util/threads/TestLimitLatch.java Thu May 26 15:27:52 2011 @@ -18,108 +18,111 @@ package org.apache.tomcat.util.threads; import junit.framework.TestCase; -public class TestCounterLatch extends TestCase { +public class TestLimitLatch extends TestCase { - private volatile CounterLatch latch = null; + private volatile LimitLatch latch = null; @Override public void tearDown() { - CounterLatch temp = latch; + LimitLatch temp = latch; if (temp!=null) temp.releaseAll(); latch = null; } public void testNoThreads() throws Exception { - latch = new CounterLatch(0,0); - assertEquals("No threads should be waiting", false, latch.hasQueuedThreads()); + latch = new LimitLatch(0); + assertEquals("No threads should be waiting", false, + latch.hasQueuedThreads()); } public void testOneThreadNoWait() throws Exception { - latch = new CounterLatch(0,1); - assertEquals("No threads should be waiting", false, latch.hasQueuedThreads()); - Thread testThread = new Thread() { - @Override - public void run() { - try { - latch.await(); - } catch (InterruptedException x) { - x.printStackTrace(); - } - } - }; + latch = new LimitLatch(1); + assertEquals("No threads should be waiting", false, + latch.hasQueuedThreads()); + Thread testThread = new TestThread(); testThread.start(); Thread.sleep(50); - assertEquals("0 threads should be waiting", 0, latch.getQueuedThreads().size()); - latch.countUp(); + assertEquals("0 threads should be waiting", 0, + latch.getQueuedThreads().size()); + latch.countUpOrAwait(); Thread.sleep(50); - assertEquals("No threads should be waiting", false, latch.hasQueuedThreads()); + assertEquals("No threads should be waiting", false, + latch.hasQueuedThreads()); } public void testOneThreadWaitCountUp() throws Exception { - latch = new CounterLatch(0,1); - assertEquals("No threads should be waiting", false, latch.hasQueuedThreads()); - Thread testThread = new Thread() { - @Override - public void run() { - try { - latch.await(); - } catch (InterruptedException x) { - x.printStackTrace(); - } - } - }; - latch.countUp(); - testThread.start(); - Thread.sleep(50); - assertEquals("1 threads should be waiting", 1, latch.getQueuedThreads().size()); - latch.countUp(); - Thread.sleep(50); - assertEquals("No threads should be waiting", false, latch.hasQueuedThreads()); - } - - public void testOneThreadWaitCountDown() throws Exception { - latch = new CounterLatch(1,0); - assertEquals("No threads should be waiting", false, latch.hasQueuedThreads()); - Thread testThread = new Thread() { - @Override - public void run() { - try { - //System.out.println("Entering ["+Thread.currentThread().getName()+"]"); - latch.await(); - } catch (InterruptedException x) { - x.printStackTrace(); - } - //System.out.println("Exiting ["+Thread.currentThread().getName()+"]"); - } - }; - latch.countDown(); + latch = new LimitLatch(1); + assertEquals("No threads should be waiting", false, + latch.hasQueuedThreads()); + Thread testThread = new TestThread(); + latch.countUpOrAwait(); testThread.start(); Thread.sleep(50); - assertEquals("1 threads should be waiting", 1, latch.getQueuedThreads().size()); + assertEquals("1 threads should be waiting", 1, + latch.getQueuedThreads().size()); latch.countDown(); Thread.sleep(50); - assertEquals("No threads should be waiting", false, latch.hasQueuedThreads()); + assertEquals("No threads should be waiting", false, + latch.hasQueuedThreads()); } - + public void testOneRelease() throws Exception { - latch = new CounterLatch(1,0); - assertEquals("No threads should be waiting", false, latch.hasQueuedThreads()); - Thread testThread = new Thread() { - @Override - public void run() { - try { - latch.await(); - } catch (InterruptedException x) { - x.printStackTrace(); - } - } - }; - latch.countDown(); + latch = new LimitLatch(1); + assertEquals("No threads should be waiting", false, + latch.hasQueuedThreads()); + Thread testThread = new TestThread(); + latch.countUpOrAwait(); testThread.start(); Thread.sleep(50); - assertEquals("1 threads should be waiting", 1, latch.getQueuedThreads().size()); + assertEquals("1 threads should be waiting", 1, + latch.getQueuedThreads().size()); latch.releaseAll(); Thread.sleep(50); - assertEquals("No threads should be waiting", false, latch.hasQueuedThreads()); - } + assertEquals("No threads should be waiting", false, + latch.hasQueuedThreads()); + } + + public void testTenWait() throws Exception { + latch = new LimitLatch(10); + assertEquals("No threads should be waiting", false, + latch.hasQueuedThreads()); + Thread[] testThread = new TestThread[30]; + for (int i = 0; i < 30; i++) { + testThread[i] = new TestThread(1000); + testThread[i].start(); + } + Thread.sleep(50); + assertEquals("20 threads should be waiting", 20, + latch.getQueuedThreads().size()); + Thread.sleep(1000); + assertEquals("10 threads should be waiting", 10, + latch.getQueuedThreads().size()); + Thread.sleep(1000); + assertEquals("No threads should be waiting", false, + latch.hasQueuedThreads()); + } + + private class TestThread extends Thread { + + private int holdTime; + + public TestThread() { + this(100); + } + + public TestThread(int holdTime) { + this.holdTime = holdTime; + } + + @Override + public void run() { + try { + latch.countUpOrAwait(); + Thread.sleep(holdTime); + latch.countDown(); + } catch (InterruptedException x) { + x.printStackTrace(); + } + } + } } Modified: tomcat/trunk/webapps/docs/changelog.xml URL: http://svn.apache.org/viewvc/tomcat/trunk/webapps/docs/changelog.xml?rev=1127962&r1=1127961&r2=1127962&view=diff ============================================================================== --- tomcat/trunk/webapps/docs/changelog.xml (original) +++ tomcat/trunk/webapps/docs/changelog.xml Thu May 26 15:27:52 2011 @@ -82,6 +82,10 @@ Include a comment header in generated java files that indicates when the file was generated and which version of Tomcat generated it. (markt) </add> + <fix> + <bug>51240</bug>: Ensure that maxConnections limit is enforced when + multiple acceptor threads are configured. (markt) + </fix> </changelog> </subsection> <subsection name="Cluster"> --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org