Author: markt Date: Sun May 17 16:42:34 2009 New Revision: 775707 URL: http://svn.apache.org/viewvc?rev=775707&view=rev Log: Fix POOL-75 for GOP. Ensure objects are allocated in the order in which threads ask for them.
Modified: commons/proper/pool/trunk/src/java/org/apache/commons/pool/impl/GenericObjectPool.java Modified: commons/proper/pool/trunk/src/java/org/apache/commons/pool/impl/GenericObjectPool.java URL: http://svn.apache.org/viewvc/commons/proper/pool/trunk/src/java/org/apache/commons/pool/impl/GenericObjectPool.java?rev=775707&r1=775706&r2=775707&view=diff ============================================================================== --- commons/proper/pool/trunk/src/java/org/apache/commons/pool/impl/GenericObjectPool.java (original) +++ commons/proper/pool/trunk/src/java/org/apache/commons/pool/impl/GenericObjectPool.java Sun May 17 16:42:34 2009 @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.NoSuchElementException; import java.util.TimerTask; @@ -539,7 +540,7 @@ */ public synchronized void setMaxActive(int maxActive) { _maxActive = maxActive; - notifyAll(); + allocate(); } /** @@ -570,7 +571,7 @@ case WHEN_EXHAUSTED_FAIL: case WHEN_EXHAUSTED_GROW: _whenExhaustedAction = whenExhaustedAction; - notifyAll(); + allocate(); break; default: throw new IllegalArgumentException("whenExhaustedAction " + whenExhaustedAction + " not recognized."); @@ -614,7 +615,7 @@ */ public synchronized void setMaxWait(long maxWait) { _maxWait = maxWait; - notifyAll(); + allocate(); } /** @@ -641,7 +642,7 @@ */ public synchronized void setMaxIdle(int maxIdle) { _maxIdle = maxIdle; - notifyAll(); + allocate(); } /** @@ -658,7 +659,7 @@ */ public synchronized void setMinIdle(int minIdle) { _minIdle = minIdle; - notifyAll(); + allocate(); } /** @@ -917,43 +918,42 @@ setTimeBetweenEvictionRunsMillis(conf.timeBetweenEvictionRunsMillis); setSoftMinEvictableIdleTimeMillis(conf.softMinEvictableIdleTimeMillis); setLifo(conf.lifo); - notifyAll(); + allocate(); } //-- ObjectPool methods ------------------------------------------ public Object borrowObject() throws Exception { long starttime = System.currentTimeMillis(); + Latch latch = new Latch(); + synchronized (this) { + _allocationQueue.add(latch); + allocate(); + } + for(;;) { - ObjectTimestampPair pair = null; - synchronized (this) { assertOpen(); - // if there are any sleeping, just grab one of those - try { - pair = (ObjectTimestampPair)(_pool.removeFirst()); - } catch(NoSuchElementException e) { - /* ignored */ - } - - // otherwise - if(null == pair) { - // check if we can create one - // (note we know that the num sleeping is 0, else we wouldn't be here) - if(_maxActive < 0 || (_numActive + _numInternalProcessing) < _maxActive) { - // allow new object to be created - } else { - // the pool is exhausted - switch(_whenExhaustedAction) { - case WHEN_EXHAUSTED_GROW: - // allow new object to be created - break; - case WHEN_EXHAUSTED_FAIL: - throw new NoSuchElementException("Pool exhausted"); - case WHEN_EXHAUSTED_BLOCK: - try { + } + + // If no object was allocated from the pool above + if(latch._pair == null) { + // check if we were allowed to create one + if(latch._mayCreate) { + // allow new object to be created + } else { + // the pool is exhausted + switch(_whenExhaustedAction) { + case WHEN_EXHAUSTED_GROW: + // allow new object to be created + break; + case WHEN_EXHAUSTED_FAIL: + throw new NoSuchElementException("Pool exhausted"); + case WHEN_EXHAUSTED_BLOCK: + try { + synchronized (latch) { if(_maxWait <= 0) { - wait(); + latch.wait(); } else { // this code may be executed again after a notify then continue cycle // so, need to calculate the amount of time to wait @@ -961,62 +961,66 @@ final long waitTime = _maxWait - elapsed; if (waitTime > 0) { - wait(waitTime); + latch.wait(waitTime); } } - } catch(InterruptedException e) { - Thread.currentThread().interrupt(); - throw e; - } - if(_maxWait > 0 && ((System.currentTimeMillis() - starttime) >= _maxWait)) { - throw new NoSuchElementException("Timeout waiting for idle object"); - } else { - continue; // keep looping } - default: - throw new IllegalArgumentException("WhenExhaustedAction property " + _whenExhaustedAction + " not recognized."); - } + } catch(InterruptedException e) { + Thread.currentThread().interrupt(); + throw e; + } + if(_maxWait > 0 && ((System.currentTimeMillis() - starttime) >= _maxWait)) { + throw new NoSuchElementException("Timeout waiting for idle object"); + } else { + continue; // keep looping + } + default: + throw new IllegalArgumentException("WhenExhaustedAction property " + _whenExhaustedAction + " not recognized."); } } - _numActive++; } - // create new object when needed boolean newlyCreated = false; - if(null == pair) { + if(null == latch._pair) { try { Object obj = _factory.makeObject(); - pair = new ObjectTimestampPair(obj); + latch._pair = new ObjectTimestampPair(obj); newlyCreated = true; } finally { if (!newlyCreated) { // object cannot be created synchronized (this) { - _numActive--; - notifyAll(); + _numInternalProcessing--; + // No need to reset latch - about to throw exception + allocate(); } } } } - // activate & validate the object try { - _factory.activateObject(pair.value); - if(_testOnBorrow && !_factory.validateObject(pair.value)) { + _factory.activateObject(latch._pair.value); + if(_testOnBorrow && !_factory.validateObject(latch._pair.value)) { throw new Exception("ValidateObject failed"); } - return pair.value; + synchronized(this) { + _numInternalProcessing--; + _numActive++; + } + return latch._pair.value; } catch (Throwable e) { // object cannot be activated or is invalid try { - _factory.destroyObject(pair.value); + _factory.destroyObject(latch._pair.value); } catch (Throwable e2) { // cannot destroy broken object } synchronized (this) { - _numActive--; - notifyAll(); + _numInternalProcessing--; + latch.reset(); + _allocationQueue.add(0, latch); + allocate(); } if(newlyCreated) { throw new NoSuchElementException("Could not create a validated object, cause: " + e.getMessage()); @@ -1028,6 +1032,37 @@ } } + private synchronized void allocate() { + // First use any objects in the pool to clear the queue + for (;;) { + if (isClosed()) return; + if (!_pool.isEmpty() && !_allocationQueue.isEmpty()) { + Latch latch = (Latch) _allocationQueue.removeFirst(); + latch._pair = (ObjectTimestampPair) _pool.removeFirst(); + _numInternalProcessing++; + synchronized (latch) { + latch.notify(); + } + } else { + break; + } + } + // Second utilise any spare capacity to create new objects + for(;;) { + if (isClosed()) return; + if((!_allocationQueue.isEmpty()) && (_maxActive < 0 || (_numActive + _numInternalProcessing) < _maxActive)) { + Latch latch = (Latch) _allocationQueue.removeFirst(); + latch._mayCreate = true; + _numInternalProcessing++; + synchronized (latch) { + latch.notify(); + } + } else { + break; + } + } + } + public void invalidateObject(Object obj) throws Exception { try { if (_factory != null) { @@ -1036,7 +1071,7 @@ } finally { synchronized (this) { _numActive--; - notifyAll(); // _numActive has changed + allocate(); } } } @@ -1068,7 +1103,7 @@ } finally { synchronized(this) { _numInternalProcessing--; - notifyAll(); + allocate(); } } } @@ -1117,7 +1152,7 @@ // "behavior flag",decrementNumActive, from addObjectToPool. synchronized(this) { _numActive--; - notifyAll(); + allocate(); } } } @@ -1152,7 +1187,7 @@ if (decrementNumActive) { _numActive--; } - notifyAll(); + allocate(); } } } @@ -1168,7 +1203,7 @@ if (decrementNumActive) { synchronized(this) { _numActive--; - notifyAll(); + allocate(); } } } @@ -1319,7 +1354,7 @@ } finally { synchronized (this) { _numInternalProcessing--; - notifyAll(); + allocate(); } } } @@ -1490,6 +1525,26 @@ } + /** + * Latch used to control allocation order of objects to threads to ensure + * fairness. ie objects are allocated to threads in the order that threads + * request objects. + */ + private static final class Latch { + ObjectTimestampPair _pair; + boolean _mayCreate = false; + + /** + * Reset the latch data. Used when an allocation fails and the latch + * needs to be re-added to the queue. + */ + private void reset() { + _pair = null; + _mayCreate = false; + } + } + + //--- private attributes --------------------------------------- /** @@ -1664,4 +1719,12 @@ * number of objects but are neither active nor idle. */ private int _numInternalProcessing = 0; + + /** + * Used to track the order in which threads call {...@link #borrowObject()} so + * that objects can be allocated in the order in which the threads requested + * them. + */ + private LinkedList _allocationQueue = new LinkedList(); + }