Author: markt Date: Sun May 17 22:58:25 2009 New Revision: 775775 URL: http://svn.apache.org/viewvc?rev=775775&view=rev Log: Fix POOL-75 for GKOP. Objects are now allocated to threads in the order in which the threads made their request.
Modified: commons/proper/pool/trunk/src/java/org/apache/commons/pool/impl/GenericKeyedObjectPool.java Modified: commons/proper/pool/trunk/src/java/org/apache/commons/pool/impl/GenericKeyedObjectPool.java URL: http://svn.apache.org/viewvc/commons/proper/pool/trunk/src/java/org/apache/commons/pool/impl/GenericKeyedObjectPool.java?rev=775775&r1=775774&r2=775775&view=diff ============================================================================== --- commons/proper/pool/trunk/src/java/org/apache/commons/pool/impl/GenericKeyedObjectPool.java (original) +++ commons/proper/pool/trunk/src/java/org/apache/commons/pool/impl/GenericKeyedObjectPool.java Sun May 17 22:58:25 2009 @@ -21,6 +21,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; @@ -562,7 +563,7 @@ */ public synchronized void setMaxActive(int maxActive) { _maxActive = maxActive; - notifyAll(); + allocate(); } /** @@ -589,7 +590,7 @@ */ public synchronized void setMaxTotal(int maxTotal) { _maxTotal = maxTotal; - notifyAll(); + allocate(); } /** @@ -621,7 +622,7 @@ case WHEN_EXHAUSTED_FAIL: case WHEN_EXHAUSTED_GROW: _whenExhaustedAction = whenExhaustedAction; - notifyAll(); + allocate(); break; default: throw new IllegalArgumentException("whenExhaustedAction " + whenExhaustedAction + " not recognized."); @@ -693,7 +694,7 @@ */ public synchronized void setMaxIdle(int maxIdle) { _maxIdle = maxIdle; - notifyAll(); + allocate(); } /** @@ -944,51 +945,37 @@ public Object borrowObject(Object key) throws Exception { long starttime = System.currentTimeMillis(); + Latch latch = new Latch(key); + synchronized (this) { + _allocationQueue.add(latch); + allocate(); + } + for(;;) { - ObjectTimestampPair pair = null; - ObjectQueue pool = null; synchronized (this) { assertOpen(); - pool = (ObjectQueue)(_poolMap.get(key)); - if(null == pool) { - pool = new ObjectQueue(); - _poolMap.put(key,pool); - _poolList.add(key); - } - // if there are any sleeping, just grab one of those - try { - pair = (ObjectTimestampPair)(pool.queue.removeFirst()); - if(null != pair) { - _totalIdle--; - } - } catch(NoSuchElementException e) { /* ignored */ - } - // otherwise - if(null == pair) { - // if there is a totalMaxActive and we are at the limit then - // we have to make room - if ((_maxTotal > 0) - && (_totalActive + _totalIdle + _totalInternalProcessing >= _maxTotal)) { - clearOldest(); - } - - // check if we can create one - // (note we know that the num sleeping is 0, else we wouldn't be here) - if ((_maxActive < 0 || pool.activeCount + pool.internalProcessingCount < _maxActive) && - (_maxTotal < 0 || _totalActive + _totalIdle + _totalInternalProcessing < _maxTotal)) { + } + // If no object was allocated + if (null == latch._pair) { + // Check to see if we were allowed to create one + if (latch._mayCreate) { + // allow new object to be created + } else { + // the pool is exhausted + switch(_whenExhaustedAction) { + case WHEN_EXHAUSTED_GROW: // allow new object to be created - } else { - // the pool is exhausted - switch(_whenExhaustedAction) { - case WHEN_EXHAUSTED_GROW: - // allow new object to be created - break; - case WHEN_EXHAUSTED_FAIL: - throw new NoSuchElementException("Pool exhausted"); - case WHEN_EXHAUSTED_BLOCK: - try { + break; + case WHEN_EXHAUSTED_FAIL: + synchronized (this) { + _allocationQueue.remove(latch); + } + throw new NoSuchElementException("Pool exhausted"); + case WHEN_EXHAUSTED_BLOCK: + try { + synchronized (latch) { if(_maxWait <= 0) { - wait(); + latch.wait(); } else { // this code may be executed again after a notify then continue cycle // so, need to calculate the amount of time to wait @@ -996,38 +983,38 @@ final long waitTime = _maxWait - elapsed; if (waitTime > 0) { - wait(waitTime); + latch.wait(waitTime); } } - } catch(InterruptedException e) { - Thread.currentThread().interrupt(); - throw e; } - if(_maxWait > 0 && ((System.currentTimeMillis() - starttime) >= _maxWait)) { - throw new NoSuchElementException("Timeout waiting for idle object"); - } else { - continue; // keep looping + } catch(InterruptedException e) { + Thread.currentThread().interrupt(); + throw e; } - default: - throw new IllegalArgumentException("whenExhaustedAction " + _whenExhaustedAction + " not recognized."); - } + if(_maxWait > 0 && ((System.currentTimeMillis() - starttime) >= _maxWait)) { + throw new NoSuchElementException("Timeout waiting for idle object"); + } else { + continue; // keep looping + } + default: + throw new IllegalArgumentException("whenExhaustedAction " + _whenExhaustedAction + " not recognized."); } } - pool.incrementActiveCount(); } boolean newlyCreated = false; - if (null == pair) { + if (null == latch._pair) { try { Object obj = _factory.makeObject(key); - pair = new ObjectTimestampPair(obj); + latch._pair = new ObjectTimestampPair(obj); newlyCreated = true; } finally { if (!newlyCreated) { // object cannot be created synchronized (this) { - pool.decrementActiveCount(); - notifyAll(); + latch._pool.decrementInternalProcessingCount(); + // No need to reset latch - about to throw exception + allocate(); } } } @@ -1035,21 +1022,27 @@ // activate & validate the object try { - _factory.activateObject(key, pair.value); - if (_testOnBorrow && !_factory.validateObject(key, pair.value)) { + _factory.activateObject(key, latch._pair.value); + if (_testOnBorrow && !_factory.validateObject(key, latch._pair.value)) { throw new Exception("ValidateObject failed"); } - return pair.value; + synchronized (this) { + latch._pool.decrementInternalProcessingCount(); + latch._pool.incrementActiveCount(); + } + return latch._pair.value; } catch (Throwable e) { // object cannot be activated or is invalid try { - _factory.destroyObject(key,pair.value); + _factory.destroyObject(key, latch._pair.value); } catch (Throwable e2) { // cannot destroy broken object } synchronized (this) { - pool.decrementActiveCount(); - notifyAll(); + latch._pool.decrementInternalProcessingCount(); + latch.reset(); + _allocationQueue.add(0, latch); + allocate(); } if(newlyCreated) { throw new NoSuchElementException( @@ -1063,6 +1056,58 @@ } } + private synchronized void allocate() { + if (isClosed()) return; + + for (;;) { + if (!_allocationQueue.isEmpty()) { + // First use any objects in the pool to clear the queue + Latch latch = (Latch) _allocationQueue.getFirst(); + ObjectQueue pool = (ObjectQueue)(_poolMap.get(latch._key)); + if(null == pool) { + pool = new ObjectQueue(); + _poolMap.put(latch._key, pool); + _poolList.add(latch._key); + } + latch._pool = pool; + if (!pool.queue.isEmpty()) { + _allocationQueue.removeFirst(); + latch._pair = (ObjectTimestampPair) pool.queue.removeFirst(); + pool.incrementInternalProcessingCount(); + _totalIdle--; + synchronized (latch) { + latch.notify(); + } + // Next item in queue + continue; + } + + // If there is a totalMaxActive and we are at the limit then + // we have to make room + if ((_maxTotal > 0) + && (_totalActive + _totalIdle + _totalInternalProcessing >= _maxTotal)) { + clearOldest(); + } + + + // Second utilise any spare capacity to create new objects + if ((_maxActive < 0 || pool.activeCount + pool.internalProcessingCount < _maxActive) && + (_maxTotal < 0 || _totalActive + _totalIdle + _totalInternalProcessing < _maxTotal)) { + // allow new object to be created + _allocationQueue.removeFirst(); + latch._mayCreate = true; + pool.incrementInternalProcessingCount(); + synchronized (latch) { + latch.notify(); + } + // Next item in queue + continue; + } + } + break; + } + } + /** * Clears the pool, removing all pooled instances. */ @@ -1184,7 +1229,7 @@ } finally { synchronized(this) { _totalInternalProcessing--; - notifyAll(); + allocate(); } } } @@ -1250,7 +1295,7 @@ if (pool != null) { synchronized(this) { pool.decrementActiveCount(); - notifyAll(); + allocate(); } } } @@ -1301,7 +1346,7 @@ if (decrementNumActive) { pool.decrementActiveCount(); } - notifyAll(); + allocate(); } } } @@ -1317,7 +1362,7 @@ if (decrementNumActive) { synchronized(this) { pool.decrementActiveCount(); - notifyAll(); + allocate(); } } } @@ -1335,7 +1380,7 @@ _poolList.add(key); } pool.decrementActiveCount(); - notifyAll(); // _totalActive has changed + allocate(); // _totalActive has changed } } } @@ -1696,7 +1741,7 @@ } finally { synchronized (this) { pool.decrementInternalProcessingCount(); - notifyAll(); + allocate(); } } } @@ -1941,6 +1986,31 @@ public boolean lifo = GenericKeyedObjectPool.DEFAULT_LIFO; } + /** + * Latch used to control allocation order of objects to threads to ensure + * fairness. ie objects are allocated to threads in the order that threads + * request objects. + */ + private static final class Latch { + Object _key; + ObjectQueue _pool; + ObjectTimestampPair _pair; + boolean _mayCreate = false; + + private Latch(Object key) { + _key = key; + } + + /** + * Reset the latch data. Used when an allocation fails and the latch + * needs to be re-added to the queue. + */ + private void reset() { + _pair = null; + _mayCreate = false; + } + } + //--- protected attributes --------------------------------------- /** @@ -2115,4 +2185,12 @@ /** Whether or not the pools behave as LIFO queues (last in first out) */ private boolean _lifo = DEFAULT_LIFO; + + /** + * Used to track the order in which threads call {...@link #borrowObject()} so + * that objects can be allocated in the order in which the threads requested + * them. + */ + private LinkedList _allocationQueue = new LinkedList(); + }