This is an automated email from the ASF dual-hosted git repository. psteitz pushed a commit to branch POOL_2_X in repository https://gitbox.apache.org/repos/asf/commons-pool.git
commit 078f737490805b38b8993287c6b3bcfbd27332da Author: Phil Steitz <phil.ste...@gmail.com> AuthorDate: Thu May 22 15:08:32 2025 -0700 JIRA: POOL-419. Add sync on pooled object in GOP returnObject to handle concurrent return/invalidate by client threads. --- .../commons/pool2/impl/GenericObjectPool.java | 117 +++++++++++---------- .../commons/pool2/impl/TestGenericObjectPool.java | 80 +++++++++++++- 2 files changed, 138 insertions(+), 59 deletions(-) diff --git a/src/main/java/org/apache/commons/pool2/impl/GenericObjectPool.java b/src/main/java/org/apache/commons/pool2/impl/GenericObjectPool.java index 18caa97e..63fa53a2 100644 --- a/src/main/java/org/apache/commons/pool2/impl/GenericObjectPool.java +++ b/src/main/java/org/apache/commons/pool2/impl/GenericObjectPool.java @@ -1016,75 +1016,76 @@ public class GenericObjectPool<T> extends BaseGenericObjectPool<T> } return; // Object was abandoned and removed } + synchronized (p) { + markReturningState(p); - markReturningState(p); - - final Duration activeTime = p.getActiveDuration(); + final Duration activeTime = p.getActiveDuration(); - if (getTestOnReturn() && !factory.validateObject(p)) { - try { - destroy(p, DestroyMode.NORMAL); - } catch (final Exception e) { - swallowException(e); - } - try { - ensureIdle(1, false); - } catch (final Exception e) { - swallowException(e); + if (getTestOnReturn() && !factory.validateObject(p)) { + try { + destroy(p, DestroyMode.NORMAL); + } catch (final Exception e) { + swallowException(e); + } + try { + ensureIdle(1, false); + } catch (final Exception e) { + swallowException(e); + } + updateStatsReturn(activeTime); + return; } - updateStatsReturn(activeTime); - return; - } - try { - factory.passivateObject(p); - } catch (final Exception e1) { - swallowException(e1); try { - destroy(p, DestroyMode.NORMAL); - } catch (final Exception e) { - swallowException(e); - } - try { - ensureIdle(1, false); - } catch (final Exception e) { - swallowException(e); + factory.passivateObject(p); + } catch (final Exception e1) { + swallowException(e1); + try { + destroy(p, DestroyMode.NORMAL); + } catch (final Exception e) { + swallowException(e); + } + try { + ensureIdle(1, false); + } catch (final Exception e) { + swallowException(e); + } + updateStatsReturn(activeTime); + return; } - updateStatsReturn(activeTime); - return; - } - if (!p.deallocate()) { - throw new IllegalStateException( - "Object has already been returned to this pool or is invalid"); - } - - final int maxIdleSave = getMaxIdle(); - if (isClosed() || maxIdleSave > -1 && maxIdleSave <= idleObjects.size()) { - try { - destroy(p, DestroyMode.NORMAL); - } catch (final Exception e) { - swallowException(e); - } - try { - ensureIdle(1, false); - } catch (final Exception e) { - swallowException(e); + if (!p.deallocate()) { + throw new IllegalStateException( + "Object has already been returned to this pool or is invalid"); } - } else { - if (getLifo()) { - idleObjects.addFirst(p); + + final int maxIdleSave = getMaxIdle(); + if (isClosed() || maxIdleSave > -1 && maxIdleSave <= idleObjects.size()) { + try { + destroy(p, DestroyMode.NORMAL); + } catch (final Exception e) { + swallowException(e); + } + try { + ensureIdle(1, false); + } catch (final Exception e) { + swallowException(e); + } } else { - idleObjects.addLast(p); - } - if (isClosed()) { - // Pool closed while object was being added to idle objects. - // Make sure the returned object is destroyed rather than left - // in the idle object pool (which would effectively be a leak) - clear(); + if (getLifo()) { + idleObjects.addFirst(p); + } else { + idleObjects.addLast(p); + } + if (isClosed()) { + // Pool closed while object was being added to idle objects. + // Make sure the returned object is destroyed rather than left + // in the idle object pool (which would effectively be a leak) + clear(); + } } + updateStatsReturn(activeTime); } - updateStatsReturn(activeTime); } /** diff --git a/src/test/java/org/apache/commons/pool2/impl/TestGenericObjectPool.java b/src/test/java/org/apache/commons/pool2/impl/TestGenericObjectPool.java index a258433d..13390dad 100644 --- a/src/test/java/org/apache/commons/pool2/impl/TestGenericObjectPool.java +++ b/src/test/java/org/apache/commons/pool2/impl/TestGenericObjectPool.java @@ -42,6 +42,10 @@ import java.util.Random; import java.util.Set; import java.util.Timer; import java.util.TimerTask; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.FutureTask; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -2901,6 +2905,80 @@ public class TestGenericObjectPool extends TestBaseObjectPool { final Thread t = new Thread(thread); t.start(); Thread.sleep(50); // Wait for the thread to start - pool.addObject(); // Should return immediately + pool.addObject(); // Should return immediately + } + + private BasePooledObjectFactory<Object> createPooledObjectFactory() { + return new BasePooledObjectFactory<Object>() { + @Override + public Object create() { + return new Object(); + } + + @Override + public PooledObject<Object> wrap(final Object obj) { + return new DefaultPooledObject<>(obj); + } + }; + } + + + /* + * Test for POOL-419. + * https://issues.apache.org/jira/browse/POOL-419 + */ + @Test + void testPool419() throws Exception { + + final ExecutorService executor = Executors.newFixedThreadPool(100); + + final GenericObjectPoolConfig<Object> config = new GenericObjectPoolConfig<>(); + + final int maxConnections = 10000; + + config.setMaxTotal(maxConnections); + config.setMaxIdle(maxConnections); + config.setMinIdle(1); + + final BasePooledObjectFactory<Object> pof = createPooledObjectFactory(); + + try (ObjectPool<Object> connectionPool = new GenericObjectPool<>(pof, config)) { + assertNotNull(connectionPool); + + final CountDownLatch startLatch = new CountDownLatch(1); + + final List<Object> poolObjects = new ArrayList<>(); + + final List<FutureTask<Boolean>> tasks = new ArrayList<>(); + + for (int i = 0; i < maxConnections; i++) { + poolObjects.add(connectionPool.borrowObject()); + } + + for (Object poolObject : poolObjects) { + + tasks.add(new FutureTask<>(() -> { + startLatch.await(); + connectionPool.invalidateObject(poolObject); + return true; + })); + + tasks.add(new FutureTask<>(() -> { + startLatch.await(); + connectionPool.returnObject(poolObject); + return true; + })); + } + + tasks.forEach(executor::submit); + + startLatch.countDown(); // Start all tasks simultaneously + + executor.shutdown(); + assertTrue(executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS)); + + assertEquals(0, connectionPool.getNumActive(), "getNumActive() must not return a negative value"); + + } } }