Author: markt Date: Tue Mar 15 20:17:51 2016 New Revision: 1735161 URL: http://svn.apache.org/viewvc?rev=1735161&view=rev Log: POOL-303 Ensure that threads do not block indefinitely if more than maxTotal threads try to borrow an object at the same time and the factory fails to create any objects.
Modified: commons/proper/pool/trunk/src/changes/changes.xml commons/proper/pool/trunk/src/main/java/org/apache/commons/pool2/impl/GenericKeyedObjectPool.java commons/proper/pool/trunk/src/main/java/org/apache/commons/pool2/impl/GenericObjectPool.java commons/proper/pool/trunk/src/test/java/org/apache/commons/pool2/impl/TestGenericObjectPool.java Modified: commons/proper/pool/trunk/src/changes/changes.xml URL: http://svn.apache.org/viewvc/commons/proper/pool/trunk/src/changes/changes.xml?rev=1735161&r1=1735160&r2=1735161&view=diff ============================================================================== --- commons/proper/pool/trunk/src/changes/changes.xml (original) +++ commons/proper/pool/trunk/src/changes/changes.xml Tue Mar 15 20:17:51 2016 @@ -51,6 +51,11 @@ The <action> type attribute can be add,u Ensure BaseGenericObjectPool.IdentityWrapper#equals() follows the expected contract for equals(). </action> + <action dev="markt" issue="POOL-303" type="fix"> + Ensure that threads do not block indefinitely if more than maxTotal + threads try to borrow an object at the same time and the factory fails to + create any objects. + </action> </release> <release version="2.4.2" date="2015-08-01" description= "This is a patch release, including bug fixes only."> Modified: commons/proper/pool/trunk/src/main/java/org/apache/commons/pool2/impl/GenericKeyedObjectPool.java URL: http://svn.apache.org/viewvc/commons/proper/pool/trunk/src/main/java/org/apache/commons/pool2/impl/GenericKeyedObjectPool.java?rev=1735161&r1=1735160&r2=1735161&view=diff ============================================================================== --- commons/proper/pool/trunk/src/main/java/org/apache/commons/pool2/impl/GenericKeyedObjectPool.java (original) +++ commons/proper/pool/trunk/src/main/java/org/apache/commons/pool2/impl/GenericKeyedObjectPool.java Tue Mar 15 20:17:51 2016 @@ -476,7 +476,7 @@ public class GenericKeyedObjectPool<K,T> throw new IllegalStateException( "Returned object not currently part of this pool"); } - + synchronized(p) { final PooledObjectState state = p.getState(); if (state != PooledObjectState.ALLOCATED) { @@ -907,7 +907,7 @@ public class GenericKeyedObjectPool<K,T> if (objectDeque == null) { continue; } - + final Deque<PooledObject<T>> idleObjects = objectDeque.getIdleObjects(); evictionIterator = new EvictionIterator(idleObjects); if (evictionIterator.hasNext()) { @@ -1004,6 +1004,8 @@ public class GenericKeyedObjectPool<K,T> final int maxTotalPerKeySave = getMaxTotalPerKey(); // Per key final int maxTotal = getMaxTotal(); // All keys + final ObjectDeque<T> objectDeque = poolMap.get(key); + // Check against the overall limit boolean loop = true; @@ -1012,6 +1014,9 @@ public class GenericKeyedObjectPool<K,T> if (maxTotal > -1 && newNumTotal > maxTotal) { numTotal.decrementAndGet(); if (getNumIdle() == 0) { + // POOL-303. There may be threads waiting on an object + // return that isn't going to happen. Unblock them. + objectDeque.idleObjects.interuptTakeWaiters(); return null; } clearOldest(); @@ -1020,7 +1025,6 @@ public class GenericKeyedObjectPool<K,T> } } - final ObjectDeque<T> objectDeque = poolMap.get(key); final long newCreateCount = objectDeque.getCreateCount().incrementAndGet(); // Check against the per key limit @@ -1028,6 +1032,9 @@ public class GenericKeyedObjectPool<K,T> newCreateCount > Integer.MAX_VALUE) { numTotal.decrementAndGet(); objectDeque.getCreateCount().decrementAndGet(); + // POOL-303. There may be threads waiting on an object return that + // isn't going to happen. Unblock them. + objectDeque.idleObjects.interuptTakeWaiters(); return null; } @@ -1038,6 +1045,9 @@ public class GenericKeyedObjectPool<K,T> } catch (final Exception e) { numTotal.decrementAndGet(); objectDeque.getCreateCount().decrementAndGet(); + // POOL-303. There may be threads waiting on an object return that + // isn't going to happen. Unblock them. + objectDeque.idleObjects.interuptTakeWaiters(); throw e; } @@ -1433,7 +1443,7 @@ public class GenericKeyedObjectPool<K,T> /* * The map is keyed on pooled instances, wrapped to ensure that - * they work properly as keys. + * they work properly as keys. */ private final Map<IdentityWrapper<S>, PooledObject<S>> allObjects = new ConcurrentHashMap<IdentityWrapper<S>, PooledObject<S>>(); Modified: commons/proper/pool/trunk/src/main/java/org/apache/commons/pool2/impl/GenericObjectPool.java URL: http://svn.apache.org/viewvc/commons/proper/pool/trunk/src/main/java/org/apache/commons/pool2/impl/GenericObjectPool.java?rev=1735161&r1=1735160&r2=1735161&view=diff ============================================================================== --- commons/proper/pool/trunk/src/main/java/org/apache/commons/pool2/impl/GenericObjectPool.java (original) +++ commons/proper/pool/trunk/src/main/java/org/apache/commons/pool2/impl/GenericObjectPool.java Tue Mar 15 20:17:51 2016 @@ -856,6 +856,9 @@ public class GenericObjectPool<T> extend if (localMaxTotal > -1 && newCreateCount > localMaxTotal || newCreateCount > Integer.MAX_VALUE) { createCount.decrementAndGet(); + // POOL-303. There may be threads waiting on an object return that + // isn't going to happen. Unblock them. + idleObjects.interuptTakeWaiters(); return null; } @@ -864,6 +867,9 @@ public class GenericObjectPool<T> extend p = factory.makeObject(); } catch (final Exception e) { createCount.decrementAndGet(); + // POOL-303. There may be threads waiting on an object return that + // isn't going to happen. Unblock them. + idleObjects.interuptTakeWaiters(); throw e; } Modified: commons/proper/pool/trunk/src/test/java/org/apache/commons/pool2/impl/TestGenericObjectPool.java URL: http://svn.apache.org/viewvc/commons/proper/pool/trunk/src/test/java/org/apache/commons/pool2/impl/TestGenericObjectPool.java?rev=1735161&r1=1735160&r2=1735161&view=diff ============================================================================== --- commons/proper/pool/trunk/src/test/java/org/apache/commons/pool2/impl/TestGenericObjectPool.java (original) +++ commons/proper/pool/trunk/src/test/java/org/apache/commons/pool2/impl/TestGenericObjectPool.java Tue Mar 15 20:17:51 2016 @@ -33,6 +33,7 @@ import java.util.Random; import java.util.Set; import java.util.Timer; import java.util.TimerTask; +import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicInteger; import javax.management.MBeanServer; @@ -2397,11 +2398,11 @@ public class TestGenericObjectPool exten Assert.assertEquals(1, factory.validateCounter); } - + /** * Verifies that when a factory's makeObject produces instances that are not - * discernible by equals, the pool can handle them. - * + * discernible by equals, the pool can handle them. + * * JIRA: POOL-283 */ @Test @@ -2415,11 +2416,11 @@ public class TestGenericObjectPool exten pool.returnObject(s2); pool.close(); } - + /** * Verifies that when a borrowed object is mutated in a way that does not * preserve equality and hashcode, the pool can recognized it on return. - * + * * JIRA: POOL-284 */ @Test @@ -2435,11 +2436,11 @@ public class TestGenericObjectPool exten pool.returnObject(s2); pool.close(); } - + /** * Verifies that returning an object twice (without borrow in between) causes ISE * but does not re-validate or re-passivate the instance. - * + * * JIRA: POOL-285 */ @Test @@ -2460,7 +2461,7 @@ public class TestGenericObjectPool exten Assert.assertEquals(1, waiter.getPassivationCount()); } } - + public void testPreparePool() throws Exception { pool.setMinIdle(1); pool.setMaxTotal(1); @@ -2486,13 +2487,13 @@ public class TestGenericObjectPool exten return new DefaultPooledObject<Object>(value); } } - - /** + + /** * Factory that creates HashSets. Note that this means * 0) All instances are initially equal (not discernible by equals) * 1) Instances are mutable and mutation can cause change in identity / hashcode. */ - private static final class HashSetFactory + private static final class HashSetFactory extends BasePooledObjectFactory<HashSet<String>> { @Override public HashSet<String> create() throws Exception { @@ -2544,4 +2545,74 @@ public class TestGenericObjectPool exten } } } + + @Test + public void testFailingFactoryDoesNotBlockThreads() throws Exception { + + final CreateFailFactory factory = new CreateFailFactory(); + final GenericObjectPool<String> createFailFactoryPool = + new GenericObjectPool<String>(factory); + + createFailFactoryPool.setMaxTotal(1); + + // Try and borrow the first object from the pool + final WaitingTestThread thread1 = new WaitingTestThread(createFailFactoryPool, 0); + thread1.start(); + + // Wait for thread to reach semaphore + while(!factory.hasQueuedThreads()) { + Thread.sleep(200); + } + + // Try and borrow the second object from the pool + final WaitingTestThread thread2 = new WaitingTestThread(createFailFactoryPool, 0); + thread2.start(); + // Pool will not call factory since maximum number of object creations + // are already queued. + + // Thread 2 will wait on an object being returned to the pool + // Give thread 2 a chance to reach this state + Thread.sleep(1000); + + // Release thread1 + factory.release(); + // Pre-release thread2 + factory.release(); + + // Both threads should now complete. + boolean threadRunning = true; + int count = 0; + while (threadRunning && count < 15) { + threadRunning = thread1.isAlive(); + threadRunning = thread2.isAlive(); + Thread.sleep(200); + count++; + } + Assert.assertFalse(thread1.isAlive()); + Assert.assertFalse(thread2.isAlive()); + } + + private static class CreateFailFactory extends BasePooledObjectFactory<String> { + + private final Semaphore semaphore = new Semaphore(0); + + @Override + public String create() throws Exception { + semaphore.acquire(); + throw new Exception(); + } + + @Override + public PooledObject<String> wrap(String obj) { + return new DefaultPooledObject<String>(obj); + } + + public void release() { + semaphore.release(); + } + + public boolean hasQueuedThreads() { + return semaphore.hasQueuedThreads(); + } + } }