Author: markt
Date: Thu May 26 15:27:52 2011
New Revision: 1127962
URL: http://svn.apache.org/viewvc?rev=1127962&view=rev
Log:
Fix https://issues.apache.org/bugzilla/show_bug.cgi?id=51240
Replace the more generic CounterLatch (that has concurrency issues) with a more
specific LimitLatch that (mostly) only provides the functionality required by
the connectors to implement maxConnections. It also adds support for
dynamically modifying maxConnections.
Added:
tomcat/trunk/java/org/apache/tomcat/util/threads/LimitLatch.java
- copied, changed from r1125824,
tomcat/trunk/java/org/apache/tomcat/util/threads/CounterLatch.java
tomcat/trunk/test/org/apache/tomcat/util/threads/TestLimitLatch.java
- copied, changed from r1125824,
tomcat/trunk/test/org/apache/tomcat/util/threads/TestCounterLatch.java
Removed:
tomcat/trunk/java/org/apache/tomcat/util/threads/CounterLatch.java
tomcat/trunk/test/org/apache/tomcat/util/threads/TestCounterLatch.java
Modified:
tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java
tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java
tomcat/trunk/java/org/apache/tomcat/util/net/JIoEndpoint.java
tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
tomcat/trunk/webapps/docs/changelog.xml
Modified: tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java?rev=1127962&r1=1127961&r2=1127962&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java
(original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java Thu May
26 15:27:52 2011
@@ -31,7 +31,7 @@ import javax.net.ssl.KeyManagerFactory;
import org.apache.juli.logging.Log;
import org.apache.tomcat.util.IntrospectionUtils;
import org.apache.tomcat.util.res.StringManager;
-import org.apache.tomcat.util.threads.CounterLatch;
+import org.apache.tomcat.util.threads.LimitLatch;
import org.apache.tomcat.util.threads.ResizableExecutor;
import org.apache.tomcat.util.threads.TaskQueue;
import org.apache.tomcat.util.threads.TaskThreadFactory;
@@ -97,7 +97,7 @@ public abstract class AbstractEndpoint {
/**
* counter for nr of connections handled by an endpoint
*/
- private volatile CounterLatch connectionCounterLatch = null;
+ private volatile LimitLatch connectionLimitLatch = null;
/**
* Socket properties
@@ -111,7 +111,13 @@ public abstract class AbstractEndpoint {
// -----------------------------------------------------------------
Properties
private int maxConnections = 10000;
- public void setMaxConnections(int maxCon) { this.maxConnections = maxCon; }
+ public void setMaxConnections(int maxCon) {
+ this.maxConnections = maxCon;
+ LimitLatch latch = this.connectionLimitLatch;
+ // Update the latch that enforces this
+ latch.setLimit(maxCon);
+ }
+
public int getMaxConnections() { return this.maxConnections; }
/**
* External Executor based thread pool.
@@ -550,32 +556,26 @@ public abstract class AbstractEndpoint {
protected abstract Log getLog();
public abstract boolean getUseSendfile();
- protected CounterLatch initializeConnectionLatch() {
- if (connectionCounterLatch==null) {
- connectionCounterLatch = new CounterLatch(0,getMaxConnections());
+ protected LimitLatch initializeConnectionLatch() {
+ if (connectionLimitLatch==null) {
+ connectionLimitLatch = new LimitLatch(getMaxConnections());
}
- return connectionCounterLatch;
+ return connectionLimitLatch;
}
protected void releaseConnectionLatch() {
- CounterLatch latch = connectionCounterLatch;
+ LimitLatch latch = connectionLimitLatch;
if (latch!=null) latch.releaseAll();
- connectionCounterLatch = null;
- }
-
- protected void awaitConnection() throws InterruptedException {
- CounterLatch latch = connectionCounterLatch;
- if (latch!=null) latch.await();
+ connectionLimitLatch = null;
}
- protected long countUpConnection() {
- CounterLatch latch = connectionCounterLatch;
- if (latch!=null) return latch.countUp();
- else return -1;
+ protected void countUpOrAwaitConnection() throws InterruptedException {
+ LimitLatch latch = connectionLimitLatch;
+ if (latch!=null) latch.countUpOrAwait();
}
protected long countDownConnection() {
- CounterLatch latch = connectionCounterLatch;
+ LimitLatch latch = connectionLimitLatch;
if (latch!=null) {
long result = latch.countDown();
if (result<0) {
Modified: tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java?rev=1127962&r1=1127961&r2=1127962&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java Thu May 26
15:27:52 2011
@@ -949,7 +949,7 @@ public class AprEndpoint extends Abstrac
}
try {
//if we have reached max connections, wait
- awaitConnection();
+ countUpOrAwaitConnection();
long socket = 0;
try {
@@ -965,8 +965,6 @@ public class AprEndpoint extends Abstrac
// Successful accept, reset the error delay
errorDelay = 0;
- //increment socket count
- countUpConnection();
/*
* In the case of a deferred accept unlockAccept needs to
* send data. This data will be rubbish, so destroy the
Modified: tomcat/trunk/java/org/apache/tomcat/util/net/JIoEndpoint.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/JIoEndpoint.java?rev=1127962&r1=1127961&r2=1127962&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/JIoEndpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/JIoEndpoint.java Thu May 26
15:27:52 2011
@@ -211,8 +211,8 @@ public class JIoEndpoint extends Abstrac
}
try {
//if we have reached max connections, wait
- awaitConnection();
-
+ countUpOrAwaitConnection();
+
Socket socket = null;
try {
// Accept the next incoming connection from the server
@@ -237,8 +237,6 @@ public class JIoEndpoint extends Abstrac
} catch (IOException e) {
// Ignore
}
- } else {
- countUpConnection();
}
} else {
// Close socket right away
Modified: tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java?rev=1127962&r1=1127961&r2=1127962&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Thu May 26
15:27:52 2011
@@ -761,7 +761,7 @@ public class NioEndpoint extends Abstrac
}
try {
//if we have reached max connections, wait
- awaitConnection();
+ countUpOrAwaitConnection();
SocketChannel socket = null;
try {
@@ -791,8 +791,6 @@ public class NioEndpoint extends Abstrac
if (log.isDebugEnabled())
log.debug("", ix);
}
- } else {
- countUpConnection();
}
}
} catch (SocketTimeoutException sx) {
Copied: tomcat/trunk/java/org/apache/tomcat/util/threads/LimitLatch.java (from
r1125824, tomcat/trunk/java/org/apache/tomcat/util/threads/CounterLatch.java)
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/threads/LimitLatch.java?p2=tomcat/trunk/java/org/apache/tomcat/util/threads/LimitLatch.java&p1=tomcat/trunk/java/org/apache/tomcat/util/threads/CounterLatch.java&r1=1125824&r2=1127962&rev=1127962&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/threads/CounterLatch.java
(original)
+++ tomcat/trunk/java/org/apache/tomcat/util/threads/LimitLatch.java Thu May 26
15:27:52 2011
@@ -17,22 +17,15 @@
package org.apache.tomcat.util.threads;
import java.util.Collection;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
+
/**
- * Simple counter latch that allows code to keep an up and down counter, and
waits while the latch holds a certain wait value.
- * and threads using the latch to wait if the count has reached a certain
value.
- * The counter latch can be used to keep track of an atomic counter, since the
operations {@link #countDown()}
- * and {@link #countUp()} are atomic.
- * When the latch reaches the wait value, threads will block. The counter
latch can hence act like a
- * count down latch or a count up latch, while letting you keep track of the
counter as well.
- * This counter latch works opposite as the
java.util.concurrent.CountDownLatch, since the CounterLatch only blocks on a
single value and releases the threads on all other values.
- * @author fhanik
- * @see <a
href="http://download.oracle.com/javase/6/docs/api/java/util/concurrent/CountDownLatch.html">CountDownLatch</a>
- *
+ * Shared latch that allows the latch to be acquired a limited number of times
+ * after which all subsequent requests to acquire the latch will be placed in a
+ * FIFO queue until one of the shares is returned.
*/
-public class CounterLatch {
+public class LimitLatch {
private class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1L;
@@ -41,139 +34,111 @@ public class CounterLatch {
}
@Override
- protected int tryAcquireShared(int arg) {
- return ((!released) && count.get() == signal) ? -1 : 1;
+ protected int tryAcquireShared(int ignored) {
+ long newCount = count.incrementAndGet();
+ if (!released && newCount > limit) {
+ // Limit exceeded
+ count.decrementAndGet();
+ return -1;
+ } else {
+ return 1;
+ }
}
@Override
protected boolean tryReleaseShared(int arg) {
+ count.decrementAndGet();
return true;
}
}
private final Sync sync;
private final AtomicLong count;
- private volatile long signal;
+ private volatile long limit;
private volatile boolean released = false;
/**
- * Instantiates a CounterLatch object with an initial value and a wait
value.
- * @param initial - initial value of the counter
- * @param waitValue - when the counter holds this value,
- * threads calling {@link #await()} or {@link #await(long, TimeUnit)}
- * will wait until the counter changes value or until they are
interrupted.
- */
- public CounterLatch(long initial, long waitValue) {
- this.signal = waitValue;
- this.count = new AtomicLong(initial);
+ * Instantiates a LimitLatch object with an initial limit.
+ * @param limit - maximum number of concurrent acquisitions of this latch
+ */
+ public LimitLatch(long limit) {
+ this.limit = limit;
+ this.count = new AtomicLong(0);
this.sync = new Sync();
}
/**
- * Causes the calling thread to wait if the counter holds the waitValue.
- * If the counter holds any other value, the thread will return
- * If the thread is interrupted or becomes interrupted an
InterruptedException is thrown
- * @throws InterruptedException
+ * Obtain the current limit.
*/
- public void await() throws InterruptedException {
- sync.acquireSharedInterruptibly(1);
+ public long getLimit() {
+ return limit;
}
+
/**
- * Causes the calling thread to wait if the counter holds the waitValue.
- * If the counter holds any other value, the thread will return
- * If the thread is interrupted or becomes interrupted an
InterruptedException is thrown
- * @return true if the value changed, false if the timeout has elapsed
- * @throws InterruptedException
+ * Sets a new limit. If the limit is decreased there may be a period where
+ * more shares of the latch are acquired than the limit. In this case no
+ * more shares of the latch will be issued until sufficient shares have
been
+ * returned to reduce the number of acquired shares of the latch to below
+ * the new limit. If the limit is increased, threads currently in the queue
+ * may not be issued one of the newly available shares until the next
+ * request is made for a latch.
+ *
+ * @param limit The new limit
*/
- public boolean await(long timeout, TimeUnit unit) throws
InterruptedException {
- return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
+ public void setLimit(long limit) {
+ this.limit = limit;
}
+
/**
- * Increments the counter
- * @return the previous counter value
+ * Acquires a shared latch if one is available or waits for one if no
shared
+ * latch is current available.
*/
- public long countUp() {
- long previous = count.getAndIncrement();
- if (previous == signal) {
- sync.releaseShared(0);
- }
- return previous;
+ public void countUpOrAwait() throws InterruptedException {
+ sync.acquireSharedInterruptibly(1);
}
/**
- * Decrements the counter
+ * Releases a shared latch, making it available for another thread to use.
* @return the previous counter value
*/
public long countDown() {
- long previous = count.getAndDecrement();
- if (previous == signal) {
- sync.releaseShared(0);
- }
- return previous;
+ sync.releaseShared(0);
+ return count.get();
}
/**
- * Returns the current counter value
- * @return the current counter value
+ * Releases all waiting threads and causes the {@link #limit} to be ignored
+ * until {@link #reset()} is called.
*/
- public long getCount() {
- return count.get();
+ public boolean releaseAll() {
+ released = true;
+ return sync.releaseShared(0);
}
/**
- * Performs an atomic update of the counter
- * If the operation is successful and {@code expect==waitValue &&
expect!=update} waiting threads will be released.
- * @param expect - the expected counter value
- * @param update - the new counter value
- * @return <code>true</code> if successful, <code>false</code> if the
- * current value wasn't as expected
- */
- public boolean compareAndSet(long expect, long update) {
- boolean result = count.compareAndSet(expect, update);
- if (result && expect==signal && expect != update) {
- sync.releaseShared(0);
- }
- return result;
+ * Resets the latch and initializes the shared acquisition counter to zero.
+ * @see #releaseAll()
+ */
+ public void reset() {
+ this.count.set(0);
+ released = false;
}
/**
- * returns true if there are threads blocked by this latch
- * @return true if there are threads blocked by this latch
+ * Returns <code>true</code> if there is at least one thread waiting to
+ * acquire the shared lock, otherwise returns <code>false</code>.
*/
public boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
-
+
/**
- * Returns a collection of the blocked threads
- * @return a collection of the blocked threads
+ * Provide access to the list of threads waiting to acquire this limited
+ * shared latch.
*/
public Collection<Thread> getQueuedThreads() {
return sync.getQueuedThreads();
}
-
- /**
- * releases all waiting threads. This operation is permanent, and no
threads will block,
- * even if the counter hits the {@code waitValue} until {@link
#reset(long)} has been called.
- * @return <code>true</code> if this release of shared mode may permit a
- * waiting acquire (shared or exclusive) to succeed; and
- * <code>false</code> otherwise
- */
- public boolean releaseAll() {
- released = true;
- return sync.releaseShared(0);
- }
-
- /**
- * Resets the latch and initializes the counter with the new value.
- * @param value the new counter value
- * @see #releaseAll()
- */
- public void reset(long value) {
- this.count.set(value);
- released = false;
- }
-
}
Copied: tomcat/trunk/test/org/apache/tomcat/util/threads/TestLimitLatch.java
(from r1125824,
tomcat/trunk/test/org/apache/tomcat/util/threads/TestCounterLatch.java)
URL:
http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/tomcat/util/threads/TestLimitLatch.java?p2=tomcat/trunk/test/org/apache/tomcat/util/threads/TestLimitLatch.java&p1=tomcat/trunk/test/org/apache/tomcat/util/threads/TestCounterLatch.java&r1=1125824&r2=1127962&rev=1127962&view=diff
==============================================================================
--- tomcat/trunk/test/org/apache/tomcat/util/threads/TestCounterLatch.java
(original)
+++ tomcat/trunk/test/org/apache/tomcat/util/threads/TestLimitLatch.java Thu
May 26 15:27:52 2011
@@ -18,108 +18,111 @@ package org.apache.tomcat.util.threads;
import junit.framework.TestCase;
-public class TestCounterLatch extends TestCase {
+public class TestLimitLatch extends TestCase {
- private volatile CounterLatch latch = null;
+ private volatile LimitLatch latch = null;
@Override
public void tearDown() {
- CounterLatch temp = latch;
+ LimitLatch temp = latch;
if (temp!=null) temp.releaseAll();
latch = null;
}
public void testNoThreads() throws Exception {
- latch = new CounterLatch(0,0);
- assertEquals("No threads should be waiting", false,
latch.hasQueuedThreads());
+ latch = new LimitLatch(0);
+ assertEquals("No threads should be waiting", false,
+ latch.hasQueuedThreads());
}
public void testOneThreadNoWait() throws Exception {
- latch = new CounterLatch(0,1);
- assertEquals("No threads should be waiting", false,
latch.hasQueuedThreads());
- Thread testThread = new Thread() {
- @Override
- public void run() {
- try {
- latch.await();
- } catch (InterruptedException x) {
- x.printStackTrace();
- }
- }
- };
+ latch = new LimitLatch(1);
+ assertEquals("No threads should be waiting", false,
+ latch.hasQueuedThreads());
+ Thread testThread = new TestThread();
testThread.start();
Thread.sleep(50);
- assertEquals("0 threads should be waiting", 0,
latch.getQueuedThreads().size());
- latch.countUp();
+ assertEquals("0 threads should be waiting", 0,
+ latch.getQueuedThreads().size());
+ latch.countUpOrAwait();
Thread.sleep(50);
- assertEquals("No threads should be waiting", false,
latch.hasQueuedThreads());
+ assertEquals("No threads should be waiting", false,
+ latch.hasQueuedThreads());
}
public void testOneThreadWaitCountUp() throws Exception {
- latch = new CounterLatch(0,1);
- assertEquals("No threads should be waiting", false,
latch.hasQueuedThreads());
- Thread testThread = new Thread() {
- @Override
- public void run() {
- try {
- latch.await();
- } catch (InterruptedException x) {
- x.printStackTrace();
- }
- }
- };
- latch.countUp();
- testThread.start();
- Thread.sleep(50);
- assertEquals("1 threads should be waiting", 1,
latch.getQueuedThreads().size());
- latch.countUp();
- Thread.sleep(50);
- assertEquals("No threads should be waiting", false,
latch.hasQueuedThreads());
- }
-
- public void testOneThreadWaitCountDown() throws Exception {
- latch = new CounterLatch(1,0);
- assertEquals("No threads should be waiting", false,
latch.hasQueuedThreads());
- Thread testThread = new Thread() {
- @Override
- public void run() {
- try {
- //System.out.println("Entering
["+Thread.currentThread().getName()+"]");
- latch.await();
- } catch (InterruptedException x) {
- x.printStackTrace();
- }
- //System.out.println("Exiting
["+Thread.currentThread().getName()+"]");
- }
- };
- latch.countDown();
+ latch = new LimitLatch(1);
+ assertEquals("No threads should be waiting", false,
+ latch.hasQueuedThreads());
+ Thread testThread = new TestThread();
+ latch.countUpOrAwait();
testThread.start();
Thread.sleep(50);
- assertEquals("1 threads should be waiting", 1,
latch.getQueuedThreads().size());
+ assertEquals("1 threads should be waiting", 1,
+ latch.getQueuedThreads().size());
latch.countDown();
Thread.sleep(50);
- assertEquals("No threads should be waiting", false,
latch.hasQueuedThreads());
+ assertEquals("No threads should be waiting", false,
+ latch.hasQueuedThreads());
}
-
+
public void testOneRelease() throws Exception {
- latch = new CounterLatch(1,0);
- assertEquals("No threads should be waiting", false,
latch.hasQueuedThreads());
- Thread testThread = new Thread() {
- @Override
- public void run() {
- try {
- latch.await();
- } catch (InterruptedException x) {
- x.printStackTrace();
- }
- }
- };
- latch.countDown();
+ latch = new LimitLatch(1);
+ assertEquals("No threads should be waiting", false,
+ latch.hasQueuedThreads());
+ Thread testThread = new TestThread();
+ latch.countUpOrAwait();
testThread.start();
Thread.sleep(50);
- assertEquals("1 threads should be waiting", 1,
latch.getQueuedThreads().size());
+ assertEquals("1 threads should be waiting", 1,
+ latch.getQueuedThreads().size());
latch.releaseAll();
Thread.sleep(50);
- assertEquals("No threads should be waiting", false,
latch.hasQueuedThreads());
- }
+ assertEquals("No threads should be waiting", false,
+ latch.hasQueuedThreads());
+ }
+
+ public void testTenWait() throws Exception {
+ latch = new LimitLatch(10);
+ assertEquals("No threads should be waiting", false,
+ latch.hasQueuedThreads());
+ Thread[] testThread = new TestThread[30];
+ for (int i = 0; i < 30; i++) {
+ testThread[i] = new TestThread(1000);
+ testThread[i].start();
+ }
+ Thread.sleep(50);
+ assertEquals("20 threads should be waiting", 20,
+ latch.getQueuedThreads().size());
+ Thread.sleep(1000);
+ assertEquals("10 threads should be waiting", 10,
+ latch.getQueuedThreads().size());
+ Thread.sleep(1000);
+ assertEquals("No threads should be waiting", false,
+ latch.hasQueuedThreads());
+ }
+
+ private class TestThread extends Thread {
+
+ private int holdTime;
+
+ public TestThread() {
+ this(100);
+ }
+
+ public TestThread(int holdTime) {
+ this.holdTime = holdTime;
+ }
+
+ @Override
+ public void run() {
+ try {
+ latch.countUpOrAwait();
+ Thread.sleep(holdTime);
+ latch.countDown();
+ } catch (InterruptedException x) {
+ x.printStackTrace();
+ }
+ }
+ }
}
Modified: tomcat/trunk/webapps/docs/changelog.xml
URL:
http://svn.apache.org/viewvc/tomcat/trunk/webapps/docs/changelog.xml?rev=1127962&r1=1127961&r2=1127962&view=diff
==============================================================================
--- tomcat/trunk/webapps/docs/changelog.xml (original)
+++ tomcat/trunk/webapps/docs/changelog.xml Thu May 26 15:27:52 2011
@@ -82,6 +82,10 @@
Include a comment header in generated java files that indicates when
the
file was generated and which version of Tomcat generated it. (markt)
</add>
+ <fix>
+ <bug>51240</bug>: Ensure that maxConnections limit is enforced when
+ multiple acceptor threads are configured. (markt)
+ </fix>
</changelog>
</subsection>
<subsection name="Cluster">
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]