Author: markt
Date: Thu May 26 15:27:52 2011
New Revision: 1127962

URL: http://svn.apache.org/viewvc?rev=1127962&view=rev
Log:
Fix https://issues.apache.org/bugzilla/show_bug.cgi?id=51240
Replace the more generic CounterLatch (that has concurrency issues) with a more 
specific LimitLatch that (mostly) only provides the functionality required by 
the connectors to implement maxConnections. It also adds support for 
dynamically modifying maxConnections.

Added:
    tomcat/trunk/java/org/apache/tomcat/util/threads/LimitLatch.java
      - copied, changed from r1125824, 
tomcat/trunk/java/org/apache/tomcat/util/threads/CounterLatch.java
    tomcat/trunk/test/org/apache/tomcat/util/threads/TestLimitLatch.java
      - copied, changed from r1125824, 
tomcat/trunk/test/org/apache/tomcat/util/threads/TestCounterLatch.java
Removed:
    tomcat/trunk/java/org/apache/tomcat/util/threads/CounterLatch.java
    tomcat/trunk/test/org/apache/tomcat/util/threads/TestCounterLatch.java
Modified:
    tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java
    tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java
    tomcat/trunk/java/org/apache/tomcat/util/net/JIoEndpoint.java
    tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
    tomcat/trunk/webapps/docs/changelog.xml

Modified: tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java?rev=1127962&r1=1127961&r2=1127962&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java 
(original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java Thu May 
26 15:27:52 2011
@@ -31,7 +31,7 @@ import javax.net.ssl.KeyManagerFactory;
 import org.apache.juli.logging.Log;
 import org.apache.tomcat.util.IntrospectionUtils;
 import org.apache.tomcat.util.res.StringManager;
-import org.apache.tomcat.util.threads.CounterLatch;
+import org.apache.tomcat.util.threads.LimitLatch;
 import org.apache.tomcat.util.threads.ResizableExecutor;
 import org.apache.tomcat.util.threads.TaskQueue;
 import org.apache.tomcat.util.threads.TaskThreadFactory;
@@ -97,7 +97,7 @@ public abstract class AbstractEndpoint {
     /**
      * counter for nr of connections handled by an endpoint
      */
-    private volatile CounterLatch connectionCounterLatch = null;
+    private volatile LimitLatch connectionLimitLatch = null;
 
     /**
      * Socket properties
@@ -111,7 +111,13 @@ public abstract class AbstractEndpoint {
     // ----------------------------------------------------------------- 
Properties
 
     private int maxConnections = 10000;
-    public void setMaxConnections(int maxCon) { this.maxConnections = maxCon; }
+    public void setMaxConnections(int maxCon) {
+        this.maxConnections = maxCon;
+        LimitLatch latch = this.connectionLimitLatch;
+        // Update the latch that enforces this
+        latch.setLimit(maxCon);
+    }
+
     public int  getMaxConnections() { return this.maxConnections; }
     /**
      * External Executor based thread pool.
@@ -550,32 +556,26 @@ public abstract class AbstractEndpoint {
     protected abstract Log getLog();
     public abstract boolean getUseSendfile();
     
-    protected CounterLatch initializeConnectionLatch() {
-        if (connectionCounterLatch==null) {
-            connectionCounterLatch = new CounterLatch(0,getMaxConnections());
+    protected LimitLatch initializeConnectionLatch() {
+        if (connectionLimitLatch==null) {
+            connectionLimitLatch = new LimitLatch(getMaxConnections());
         }
-        return connectionCounterLatch;
+        return connectionLimitLatch;
     }
     
     protected void releaseConnectionLatch() {
-        CounterLatch latch = connectionCounterLatch;
+        LimitLatch latch = connectionLimitLatch;
         if (latch!=null) latch.releaseAll();
-        connectionCounterLatch = null;
-    }
-    
-    protected void awaitConnection() throws InterruptedException {
-        CounterLatch latch = connectionCounterLatch;
-        if (latch!=null) latch.await();
+        connectionLimitLatch = null;
     }
     
-    protected long countUpConnection() {
-        CounterLatch latch = connectionCounterLatch;
-        if (latch!=null) return latch.countUp();
-        else return -1;
+    protected void countUpOrAwaitConnection() throws InterruptedException {
+        LimitLatch latch = connectionLimitLatch;
+        if (latch!=null) latch.countUpOrAwait();
     }
     
     protected long countDownConnection() {
-        CounterLatch latch = connectionCounterLatch;
+        LimitLatch latch = connectionLimitLatch;
         if (latch!=null) {
             long result = latch.countDown();
             if (result<0) {

Modified: tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java?rev=1127962&r1=1127961&r2=1127962&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java Thu May 26 
15:27:52 2011
@@ -949,7 +949,7 @@ public class AprEndpoint extends Abstrac
                 }
                 try {
                     //if we have reached max connections, wait
-                    awaitConnection();
+                    countUpOrAwaitConnection();
                     
                     long socket = 0;
                     try {
@@ -965,8 +965,6 @@ public class AprEndpoint extends Abstrac
                     // Successful accept, reset the error delay
                     errorDelay = 0;
 
-                    //increment socket count
-                    countUpConnection();
                     /*
                      * In the case of a deferred accept unlockAccept needs to
                      * send data. This data will be rubbish, so destroy the

Modified: tomcat/trunk/java/org/apache/tomcat/util/net/JIoEndpoint.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/JIoEndpoint.java?rev=1127962&r1=1127961&r2=1127962&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/JIoEndpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/JIoEndpoint.java Thu May 26 
15:27:52 2011
@@ -211,8 +211,8 @@ public class JIoEndpoint extends Abstrac
                 }
                 try {
                     //if we have reached max connections, wait
-                    awaitConnection();
-
+                    countUpOrAwaitConnection();
+                    
                     Socket socket = null;
                     try {
                         // Accept the next incoming connection from the server
@@ -237,8 +237,6 @@ public class JIoEndpoint extends Abstrac
                             } catch (IOException e) {
                                 // Ignore
                             }
-                        } else {
-                            countUpConnection();
                         }
                     } else {
                         // Close socket right away

Modified: tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java?rev=1127962&r1=1127961&r2=1127962&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Thu May 26 
15:27:52 2011
@@ -761,7 +761,7 @@ public class NioEndpoint extends Abstrac
                 }
                 try {
                     //if we have reached max connections, wait
-                    awaitConnection();
+                    countUpOrAwaitConnection();
                     
                     SocketChannel socket = null;
                     try {
@@ -791,8 +791,6 @@ public class NioEndpoint extends Abstrac
                                 if (log.isDebugEnabled())
                                     log.debug("", ix);
                             }
-                        } else {
-                            countUpConnection();
                         }
                     }
                 } catch (SocketTimeoutException sx) {

Copied: tomcat/trunk/java/org/apache/tomcat/util/threads/LimitLatch.java (from 
r1125824, tomcat/trunk/java/org/apache/tomcat/util/threads/CounterLatch.java)
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/threads/LimitLatch.java?p2=tomcat/trunk/java/org/apache/tomcat/util/threads/LimitLatch.java&p1=tomcat/trunk/java/org/apache/tomcat/util/threads/CounterLatch.java&r1=1125824&r2=1127962&rev=1127962&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/threads/CounterLatch.java 
(original)
+++ tomcat/trunk/java/org/apache/tomcat/util/threads/LimitLatch.java Thu May 26 
15:27:52 2011
@@ -17,22 +17,15 @@
 package org.apache.tomcat.util.threads;
 
 import java.util.Collection;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.AbstractQueuedSynchronizer;
+
 /**
- * Simple counter latch that allows code to keep an up and down counter, and 
waits while the latch holds a certain wait value.
- * and threads using the latch to wait if the count has reached a certain 
value.
- * The counter latch can be used to keep track of an atomic counter, since the 
operations {@link #countDown()}
- * and {@link #countUp()} are atomic.
- * When the latch reaches the wait value, threads will block. The counter 
latch can hence act like a 
- * count down latch or a count up latch, while letting you keep track of the 
counter as well.
- * This counter latch works opposite as the 
java.util.concurrent.CountDownLatch, since the CounterLatch only blocks on a 
single value and releases the threads on all other values.
- * @author fhanik
- * @see <a 
href="http://download.oracle.com/javase/6/docs/api/java/util/concurrent/CountDownLatch.html";>CountDownLatch</a>
- *
+ * Shared latch that allows the latch to be acquired a limited number of times
+ * after which all subsequent requests to acquire the latch will be placed in a
+ * FIFO queue until one of the shares is returned.
  */
-public class CounterLatch {
+public class LimitLatch {
 
     private class Sync extends AbstractQueuedSynchronizer {
         private static final long serialVersionUID = 1L;
@@ -41,139 +34,111 @@ public class CounterLatch {
         }
 
         @Override
-        protected int tryAcquireShared(int arg) {
-            return ((!released) && count.get() == signal) ? -1 : 1;
+        protected int tryAcquireShared(int ignored) {
+            long newCount = count.incrementAndGet();
+            if (!released && newCount > limit) {
+                // Limit exceeded
+                count.decrementAndGet();
+                return -1;
+            } else {
+                return 1;
+            }
         }
 
         @Override
         protected boolean tryReleaseShared(int arg) {
+            count.decrementAndGet();
             return true;
         }
     }
 
     private final Sync sync;
     private final AtomicLong count;
-    private volatile long signal;
+    private volatile long limit;
     private volatile boolean released = false;
     
     /**
-     * Instantiates a CounterLatch object with an initial value and a wait 
value.
-     * @param initial - initial value of the counter
-     * @param waitValue - when the counter holds this value, 
-     * threads calling {@link #await()} or {@link #await(long, TimeUnit)} 
-     * will wait until the counter changes value or until they are 
interrupted.  
-     */
-    public CounterLatch(long initial, long waitValue) {
-        this.signal = waitValue;
-        this.count = new AtomicLong(initial);
+     * Instantiates a LimitLatch object with an initial limit.
+     * @param limit - maximum number of concurrent acquisitions of this latch
+     */
+    public LimitLatch(long limit) {
+        this.limit = limit;
+        this.count = new AtomicLong(0);
         this.sync = new Sync();
     }
 
     /**
-     * Causes the calling thread to wait if the counter holds the waitValue.
-     * If the counter holds any other value, the thread will return
-     * If the thread is interrupted or becomes interrupted an 
InterruptedException is thrown
-     * @throws InterruptedException
+     * Obtain the current limit.
      */
-    public void await() throws InterruptedException {
-        sync.acquireSharedInterruptibly(1);
+    public long getLimit() {
+        return limit;
     }
 
+
     /**
-     * Causes the calling thread to wait if the counter holds the waitValue.
-     * If the counter holds any other value, the thread will return
-     * If the thread is interrupted or becomes interrupted an 
InterruptedException is thrown
-     * @return true if the value changed, false if the timeout has elapsed
-     * @throws InterruptedException
+     * Sets a new limit. If the limit is decreased there may be a period where
+     * more shares of the latch are acquired than the limit. In this case no
+     * more shares of the latch will be issued until sufficient shares have 
been
+     * returned to reduce the number of acquired shares of the latch to below
+     * the new limit. If the limit is increased, threads currently in the queue
+     * may not be issued one of the newly available shares until the next
+     * request is made for a latch.
+     * 
+     * @param limit The new limit
      */
-    public boolean await(long timeout, TimeUnit unit) throws 
InterruptedException {
-        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
+    public void setLimit(long limit) {
+        this.limit = limit;
     }
 
+
     /**
-     * Increments the counter
-     * @return the previous counter value
+     * Acquires a shared latch if one is available or waits for one if no 
shared
+     * latch is current available.
      */
-    public long countUp() {
-        long previous = count.getAndIncrement();
-        if (previous == signal) {
-            sync.releaseShared(0);
-        }
-        return previous;
+    public void countUpOrAwait() throws InterruptedException {
+        sync.acquireSharedInterruptibly(1);
     }
 
     /**
-     * Decrements the counter
+     * Releases a shared latch, making it available for another thread to use.
      * @return the previous counter value
      */
     public long countDown() {
-        long previous = count.getAndDecrement();
-        if (previous == signal) {
-            sync.releaseShared(0);
-        }
-        return previous;
+        sync.releaseShared(0);
+        return count.get();
     }
     
     /**
-     * Returns the current counter value
-     * @return the current counter value
+     * Releases all waiting threads and causes the {@link #limit} to be ignored
+     * until {@link #reset()} is called.
      */
-    public long getCount() {
-        return count.get();
+    public boolean releaseAll() {
+        released = true;
+        return sync.releaseShared(0);
     }
     
     /**
-     * Performs an atomic update of the counter 
-     * If the operation is successful and {@code expect==waitValue && 
expect!=update} waiting threads will be released.  
-     * @param expect - the expected counter value
-     * @param update - the new counter value
-     * @return <code>true</code> if successful, <code>false</code> if the
-     *         current value wasn't as expected
-     */
-    public boolean compareAndSet(long expect, long update) {
-        boolean result = count.compareAndSet(expect, update);
-        if (result && expect==signal && expect != update) {
-            sync.releaseShared(0);
-        }
-        return result;
+     * Resets the latch and initializes the shared acquisition counter to zero.
+     * @see #releaseAll()
+     */
+    public void reset() {
+        this.count.set(0);
+        released = false;
     }
     
     /**
-     * returns true if there are threads blocked by this latch
-     * @return true if there are threads blocked by this latch
+     * Returns <code>true</code> if there is at least one thread waiting to
+     * acquire the shared lock, otherwise returns <code>false</code>.
      */
     public boolean hasQueuedThreads() {
         return sync.hasQueuedThreads();
     }
-    
+
     /**
-     * Returns a collection of the blocked threads
-     * @return a collection of the blocked threads
+     * Provide access to the list of threads waiting to acquire this limited
+     * shared latch.
      */
     public Collection<Thread> getQueuedThreads() {
         return sync.getQueuedThreads();
     }
-    
-    /**
-     * releases all waiting threads. This operation is permanent, and no 
threads will block,
-     * even if the counter hits the {@code waitValue} until {@link 
#reset(long)} has been called.
-     * @return <code>true</code> if this release of shared mode may permit a
-     *         waiting acquire (shared or exclusive) to succeed; and
-     *         <code>false</code> otherwise
-     */
-    public boolean releaseAll() {
-        released = true;
-        return sync.releaseShared(0);
-    }
-    
-    /**
-     * Resets the latch and initializes the counter with the new value.
-     * @param value the new counter value
-     * @see #releaseAll()
-     */
-    public void reset(long value) {
-        this.count.set(value);
-        released = false;
-    }
-
 }

Copied: tomcat/trunk/test/org/apache/tomcat/util/threads/TestLimitLatch.java 
(from r1125824, 
tomcat/trunk/test/org/apache/tomcat/util/threads/TestCounterLatch.java)
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/tomcat/util/threads/TestLimitLatch.java?p2=tomcat/trunk/test/org/apache/tomcat/util/threads/TestLimitLatch.java&p1=tomcat/trunk/test/org/apache/tomcat/util/threads/TestCounterLatch.java&r1=1125824&r2=1127962&rev=1127962&view=diff
==============================================================================
--- tomcat/trunk/test/org/apache/tomcat/util/threads/TestCounterLatch.java 
(original)
+++ tomcat/trunk/test/org/apache/tomcat/util/threads/TestLimitLatch.java Thu 
May 26 15:27:52 2011
@@ -18,108 +18,111 @@ package org.apache.tomcat.util.threads;
 
 import junit.framework.TestCase;
 
-public class TestCounterLatch extends TestCase {
+public class TestLimitLatch extends TestCase {
 
-    private volatile CounterLatch latch = null;
+    private volatile LimitLatch latch = null;
 
     @Override
     public void tearDown() {
-        CounterLatch temp = latch;
+        LimitLatch temp = latch;
         if (temp!=null) temp.releaseAll();
         latch = null;
     }
 
     public void testNoThreads() throws Exception {
-        latch = new CounterLatch(0,0);
-        assertEquals("No threads should be waiting", false, 
latch.hasQueuedThreads());
+        latch = new LimitLatch(0);
+        assertEquals("No threads should be waiting", false,
+                latch.hasQueuedThreads());
     }
 
     public void testOneThreadNoWait() throws Exception {
-        latch = new CounterLatch(0,1);
-        assertEquals("No threads should be waiting", false, 
latch.hasQueuedThreads());
-        Thread testThread = new Thread() {
-            @Override
-            public void run() {
-                try {
-                    latch.await();
-                } catch (InterruptedException x) {
-                    x.printStackTrace();
-                }
-            }
-        };
+        latch = new LimitLatch(1);
+        assertEquals("No threads should be waiting", false,
+                latch.hasQueuedThreads());
+        Thread testThread = new TestThread();
         testThread.start();
         Thread.sleep(50);
-        assertEquals("0 threads should be waiting", 0, 
latch.getQueuedThreads().size());
-        latch.countUp();
+        assertEquals("0 threads should be waiting", 0,
+                latch.getQueuedThreads().size());
+        latch.countUpOrAwait();
         Thread.sleep(50);
-        assertEquals("No threads should be waiting", false, 
latch.hasQueuedThreads());
+        assertEquals("No threads should be waiting", false,
+                latch.hasQueuedThreads());
     }
 
     public void testOneThreadWaitCountUp() throws Exception {
-        latch = new CounterLatch(0,1);
-        assertEquals("No threads should be waiting", false, 
latch.hasQueuedThreads());
-        Thread testThread = new Thread() {
-            @Override
-            public void run() {
-                try {
-                    latch.await();
-                } catch (InterruptedException x) {
-                    x.printStackTrace();
-                }
-            }
-        };
-        latch.countUp();
-        testThread.start();
-        Thread.sleep(50);
-        assertEquals("1 threads should be waiting", 1, 
latch.getQueuedThreads().size());
-        latch.countUp();
-        Thread.sleep(50);
-        assertEquals("No threads should be waiting", false, 
latch.hasQueuedThreads());
-    }
-
-    public void testOneThreadWaitCountDown() throws Exception {
-        latch = new CounterLatch(1,0);
-        assertEquals("No threads should be waiting", false, 
latch.hasQueuedThreads());
-        Thread testThread = new Thread() {
-            @Override
-            public void run() {
-                try {
-                    //System.out.println("Entering 
["+Thread.currentThread().getName()+"]");
-                    latch.await();
-                } catch (InterruptedException x) {
-                    x.printStackTrace();
-                }
-                //System.out.println("Exiting 
["+Thread.currentThread().getName()+"]");
-            }
-        };
-        latch.countDown();
+        latch = new LimitLatch(1);
+        assertEquals("No threads should be waiting", false,
+                latch.hasQueuedThreads());
+        Thread testThread = new TestThread();
+        latch.countUpOrAwait();
         testThread.start();
         Thread.sleep(50);
-        assertEquals("1 threads should be waiting", 1, 
latch.getQueuedThreads().size());
+        assertEquals("1 threads should be waiting", 1,
+                latch.getQueuedThreads().size());
         latch.countDown();
         Thread.sleep(50);
-        assertEquals("No threads should be waiting", false, 
latch.hasQueuedThreads());
+        assertEquals("No threads should be waiting", false,
+                latch.hasQueuedThreads());
     }
-    
+
     public void testOneRelease() throws Exception {
-        latch = new CounterLatch(1,0);
-        assertEquals("No threads should be waiting", false, 
latch.hasQueuedThreads());
-        Thread testThread = new Thread() {
-            @Override
-            public void run() {
-                try {
-                    latch.await();
-                } catch (InterruptedException x) {
-                    x.printStackTrace();
-                }
-            }
-        };
-        latch.countDown();
+        latch = new LimitLatch(1);
+        assertEquals("No threads should be waiting", false,
+                latch.hasQueuedThreads());
+        Thread testThread = new TestThread();
+        latch.countUpOrAwait();
         testThread.start();
         Thread.sleep(50);
-        assertEquals("1 threads should be waiting", 1, 
latch.getQueuedThreads().size());
+        assertEquals("1 threads should be waiting", 1,
+                latch.getQueuedThreads().size());
         latch.releaseAll();
         Thread.sleep(50);
-        assertEquals("No threads should be waiting", false, 
latch.hasQueuedThreads());
-    }    
+        assertEquals("No threads should be waiting", false,
+                latch.hasQueuedThreads());
+    }
+
+    public void testTenWait() throws Exception {
+        latch = new LimitLatch(10);
+        assertEquals("No threads should be waiting", false,
+                latch.hasQueuedThreads());
+        Thread[] testThread = new TestThread[30];
+        for (int i = 0; i < 30; i++) {
+            testThread[i] = new TestThread(1000);
+            testThread[i].start();
+        }
+        Thread.sleep(50);
+        assertEquals("20 threads should be waiting", 20,
+                latch.getQueuedThreads().size());
+        Thread.sleep(1000);
+        assertEquals("10 threads should be waiting", 10,
+                latch.getQueuedThreads().size());
+        Thread.sleep(1000);
+        assertEquals("No threads should be waiting", false,
+                latch.hasQueuedThreads());
+    }
+
+    private class TestThread extends Thread {
+        
+        private int holdTime;
+        
+        public TestThread() {
+            this(100);
+        }
+        
+        public TestThread(int holdTime) {
+            this.holdTime = holdTime;
+        }
+ 
+        @Override
+        public void run() {
+            try {
+                latch.countUpOrAwait();
+                Thread.sleep(holdTime);
+                latch.countDown();
+            } catch (InterruptedException x) {
+                x.printStackTrace();
+            }
+        }
+    }
 }

Modified: tomcat/trunk/webapps/docs/changelog.xml
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/webapps/docs/changelog.xml?rev=1127962&r1=1127961&r2=1127962&view=diff
==============================================================================
--- tomcat/trunk/webapps/docs/changelog.xml (original)
+++ tomcat/trunk/webapps/docs/changelog.xml Thu May 26 15:27:52 2011
@@ -82,6 +82,10 @@
         Include a comment header in generated java files that indicates when 
the
         file was generated and which version of Tomcat generated it. (markt)
       </add>
+      <fix>
+        <bug>51240</bug>: Ensure that maxConnections limit is enforced when
+        multiple acceptor threads are configured. (markt)
+      </fix>
     </changelog>
   </subsection>
   <subsection name="Cluster">



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org
For additional commands, e-mail: dev-h...@tomcat.apache.org

Reply via email to