This is an automated email from the ASF dual-hosted git repository. tv pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/commons-jcs.git
commit aefca15e3a890f153b328af8369632d7c24dfade Author: Thomas Vandahl <[email protected]> AuthorDate: Tue Apr 14 18:42:33 2026 +0200 Make CacheEventQueueFactory static --- .../jcs4/auxiliary/disk/AbstractDiskCache.java | 3 +- .../auxiliary/disk/indexed/IndexedDiskCache.java | 87 +++++++++++----------- .../lateral/socket/tcp/LateralTCPCacheNoWait.java | 9 ++- .../jcs4/auxiliary/remote/RemoteCacheNoWait.java | 3 +- .../auxiliary/remote/server/RemoteCacheServer.java | 6 +- .../jcs4/engine/CacheEventQueueFactory.java | 35 +++++---- .../engine/CacheEventQueueFactoryUnitTest.java | 10 +-- 7 files changed, 77 insertions(+), 76 deletions(-) diff --git a/commons-jcs4-core/src/main/java/org/apache/commons/jcs4/auxiliary/disk/AbstractDiskCache.java b/commons-jcs4-core/src/main/java/org/apache/commons/jcs4/auxiliary/disk/AbstractDiskCache.java index 766eec26..313e3adf 100644 --- a/commons-jcs4-core/src/main/java/org/apache/commons/jcs4/auxiliary/disk/AbstractDiskCache.java +++ b/commons-jcs4-core/src/main/java/org/apache/commons/jcs4/auxiliary/disk/AbstractDiskCache.java @@ -258,8 +258,7 @@ public abstract class AbstractDiskCache<K, V> setAuxiliaryCacheAttributes(attr); // create queue - final CacheEventQueueFactory<K, V> fact = new CacheEventQueueFactory<>(); - this.cacheEventQueue = fact.createCacheEventQueue( + this.cacheEventQueue = CacheEventQueueFactory.createCacheEventQueue( new MyCacheListener(), CacheInfo.INSTANCE.listenerId(), getCacheName(), attr.getEventQueuePoolName(), attr.getEventQueueType() ); diff --git a/commons-jcs4-core/src/main/java/org/apache/commons/jcs4/auxiliary/disk/indexed/IndexedDiskCache.java b/commons-jcs4-core/src/main/java/org/apache/commons/jcs4/auxiliary/disk/indexed/IndexedDiskCache.java index c9fb8370..a0969e54 100644 --- a/commons-jcs4-core/src/main/java/org/apache/commons/jcs4/auxiliary/disk/indexed/IndexedDiskCache.java +++ b/commons-jcs4-core/src/main/java/org/apache/commons/jcs4/auxiliary/disk/indexed/IndexedDiskCache.java @@ -32,6 +32,10 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -51,6 +55,7 @@ import org.apache.commons.jcs4.log.Log; import org.apache.commons.jcs4.utils.serialization.StandardSerializer; import org.apache.commons.jcs4.utils.struct.AbstractLRUMap; import org.apache.commons.jcs4.utils.struct.LRUMap; +import org.apache.commons.jcs4.utils.threadpool.DaemonThreadFactory; import org.apache.commons.jcs4.utils.timing.ElapsedTimer; /** @@ -236,7 +241,7 @@ public class IndexedDiskCache<K, V> extends AbstractDiskCache<K, V> private File rafDir; /** Should we keep adding to the recycle bin. False during optimization. */ - private boolean doRecycle = true; + private AtomicBoolean doRecycle = new AtomicBoolean(true); /** Should we optimize real time */ private boolean isRealTimeOptimizationEnabled = true; @@ -245,13 +250,13 @@ public class IndexedDiskCache<K, V> extends AbstractDiskCache<K, V> private boolean isShutdownOptimizationEnabled = true; /** Are we currently optimizing the files */ - private boolean isOptimizing; + private final AtomicBoolean isOptimizing = new AtomicBoolean(); /** The number of times the file has been optimized. */ private int timesOptimized; - /** The thread optimizing the file. */ - private volatile Thread currentOptimizationThread; + /** The Executor for optimizing the file. */ + private volatile ExecutorService optimizationExecutor; /** Used for counting the number of requests */ private int removeCount; @@ -311,6 +316,13 @@ public class IndexedDiskCache<K, V> extends AbstractDiskCache<K, V> this.maxKeySize = cattr.getMaxKeySize(); this.isRealTimeOptimizationEnabled = cattr.getOptimizeAtRemoveCount() > 0; this.isShutdownOptimizationEnabled = cattr.isOptimizeOnShutdown(); + + if (isRealTimeOptimizationEnabled) + { + this.optimizationExecutor = Executors.newSingleThreadExecutor( + new DaemonThreadFactory("IndexedDiskCache-Optimization-")); + } + this.logCacheName = "Region [" + getCacheName() + "] "; this.diskLimitType = cattr.getDiskLimitType(); // Make a clean file name @@ -364,7 +376,7 @@ public class IndexedDiskCache<K, V> extends AbstractDiskCache<K, V> { adjustBytesFree(ded, true); - if (doRecycle) + if (doRecycle.get()) { recycle.add(ded); log.debug("{0}: recycled ded {1}", logCacheName, ded); @@ -414,7 +426,8 @@ public class IndexedDiskCache<K, V> extends AbstractDiskCache<K, V> final ElapsedTimer timer = new ElapsedTimer(); boolean isOk = true; long expectedNextPos = 0; - for (final IndexedDiskElementDescriptor ded : sortedDescriptors) { + for (final IndexedDiskElementDescriptor ded : sortedDescriptors) + { if (expectedNextPos > ded.pos()) { log.error("{0}: Corrupt file: overlapping deds {1}", logCacheName, ded); @@ -599,21 +612,23 @@ public class IndexedDiskCache<K, V> extends AbstractDiskCache<K, V> // Prevents any interaction with the cache while we're shutting down. setAlive(false); - final Thread optimizationThread = currentOptimizationThread; - if (isRealTimeOptimizationEnabled && optimizationThread != null) + if (isRealTimeOptimizationEnabled) { - // Join with the current optimization thread. - log.debug("{0}: In dispose, optimization already in progress; waiting for completion.", - logCacheName); - - try - { - optimizationThread.join(); - } - catch (final InterruptedException e) + // Shut down the optimization executor. + optimizationExecutor.shutdown(); + if (isOptimizing.get()) { - log.error("{0}: Unable to join current optimization thread.", - logCacheName, e); + log.debug("{0}: In dispose, optimization already in progress; waiting for completion.", + logCacheName); + try + { + optimizationExecutor.awaitTermination(30, TimeUnit.SECONDS); + } + catch (final InterruptedException e) + { + log.error("{0}: Timeout waiting for optimization to complete.", + logCacheName, e); + } } } else if (isShutdownOptimizationEnabled && this.getBytesFree() > 0) @@ -647,37 +662,23 @@ public class IndexedDiskCache<K, V> extends AbstractDiskCache<K, V> protected void doOptimizeRealTime() { int optRemoveCount = getAuxiliaryCacheAttributes().getOptimizeAtRemoveCount(); - if (isRealTimeOptimizationEnabled && !isOptimizing - && removeCount++ >= optRemoveCount) + if (isRealTimeOptimizationEnabled && removeCount++ >= optRemoveCount) { - isOptimizing = true; - - log.info("{0}: Optimizing file. removeCount [{1}] OptimizeAtRemoveCount [{2}]", - logCacheName, removeCount, optRemoveCount); - - if (currentOptimizationThread == null) + if (isOptimizing.compareAndSet(false, true)) { + log.info("{0}: Optimizing file. removeCount [{1}] OptimizeAtRemoveCount [{2}]", + logCacheName, removeCount, optRemoveCount); + storageLock.writeLock().lock(); try { - if (currentOptimizationThread == null) - { - currentOptimizationThread = new Thread(() -> { - optimizeFile(); - currentOptimizationThread = null; - }, "IndexedDiskCache-OptimizationThread"); - } + optimizationExecutor.execute(this::optimizeFile); } finally { storageLock.writeLock().unlock(); } - - if (currentOptimizationThread != null) - { - currentOptimizationThread.start(); - } } } } @@ -1059,7 +1060,7 @@ public class IndexedDiskCache<K, V> extends AbstractDiskCache<K, V> { queueInput = true; // shut off recycle while we're optimizing, - doRecycle = false; + doRecycle.set(false); defragList = createPositionSortedDescriptorList(); } finally @@ -1098,8 +1099,8 @@ public class IndexedDiskCache<K, V> extends AbstractDiskCache<K, V> queuedPutList.clear(); queueInput = false; // turn recycle back on. - doRecycle = true; - isOptimizing = false; + doRecycle.set(true); + isOptimizing.set(false); } finally { @@ -1443,7 +1444,7 @@ public class IndexedDiskCache<K, V> extends AbstractDiskCache<K, V> // we need this to compare in the recycle bin ded = new IndexedDiskElementDescriptor(dataFile.length(), data.length); - if (doRecycle) + if (doRecycle.get()) { final IndexedDiskElementDescriptor rep = recycle.ceiling(ded); if (rep != null) diff --git a/commons-jcs4-core/src/main/java/org/apache/commons/jcs4/auxiliary/lateral/socket/tcp/LateralTCPCacheNoWait.java b/commons-jcs4-core/src/main/java/org/apache/commons/jcs4/auxiliary/lateral/socket/tcp/LateralTCPCacheNoWait.java index d86e4b24..f1ee3ddc 100644 --- a/commons-jcs4-core/src/main/java/org/apache/commons/jcs4/auxiliary/lateral/socket/tcp/LateralTCPCacheNoWait.java +++ b/commons-jcs4-core/src/main/java/org/apache/commons/jcs4/auxiliary/lateral/socket/tcp/LateralTCPCacheNoWait.java @@ -85,8 +85,8 @@ public class LateralTCPCacheNoWait<K, V> log.debug( "Constructing LateralTCPCacheNoWait, LateralTCPCache = [{0}]", cache ); - final CacheEventQueueFactory<K, V> fact = new CacheEventQueueFactory<>(); - this.eventQueue = fact.createCacheEventQueue( new CacheAdaptor<>( cache ), + this.eventQueue = CacheEventQueueFactory.createCacheEventQueue( + new CacheAdaptor<>( cache ), CacheInfo.INSTANCE.listenerId(), cache.getCacheName(), getAuxiliaryCacheAttributes().getEventQueuePoolName(), getAuxiliaryCacheAttributes().getEventQueueType() ); @@ -380,8 +380,9 @@ public class LateralTCPCacheNoWait<K, V> { eventQueue.destroy(); } - final CacheEventQueueFactory<K, V> fact = new CacheEventQueueFactory<>(); - this.eventQueue = fact.createCacheEventQueue( new CacheAdaptor<>( cache ), + + this.eventQueue = CacheEventQueueFactory.createCacheEventQueue( + new CacheAdaptor<>( cache ), CacheInfo.INSTANCE.listenerId(), cache.getCacheName(), getAuxiliaryCacheAttributes().getEventQueuePoolName(), getAuxiliaryCacheAttributes().getEventQueueType() ); diff --git a/commons-jcs4-core/src/main/java/org/apache/commons/jcs4/auxiliary/remote/RemoteCacheNoWait.java b/commons-jcs4-core/src/main/java/org/apache/commons/jcs4/auxiliary/remote/RemoteCacheNoWait.java index 47b4ec07..be8d140b 100644 --- a/commons-jcs4-core/src/main/java/org/apache/commons/jcs4/auxiliary/remote/RemoteCacheNoWait.java +++ b/commons-jcs4-core/src/main/java/org/apache/commons/jcs4/auxiliary/remote/RemoteCacheNoWait.java @@ -108,8 +108,7 @@ public class RemoteCacheNoWait<K, V> */ private ICacheEventQueue<K, V> createCacheEventQueue( final IRemoteCacheClient<K, V> client ) { - final CacheEventQueueFactory<K, V> factory = new CacheEventQueueFactory<>(); - return factory.createCacheEventQueue( + return CacheEventQueueFactory.createCacheEventQueue( new CacheAdaptor<>( client ), client.getListenerId(), client.getCacheName(), diff --git a/commons-jcs4-core/src/main/java/org/apache/commons/jcs4/auxiliary/remote/server/RemoteCacheServer.java b/commons-jcs4-core/src/main/java/org/apache/commons/jcs4/auxiliary/remote/server/RemoteCacheServer.java index 3d76988c..2bbb0fdc 100644 --- a/commons-jcs4-core/src/main/java/org/apache/commons/jcs4/auxiliary/remote/server/RemoteCacheServer.java +++ b/commons-jcs4-core/src/main/java/org/apache/commons/jcs4/auxiliary/remote/server/RemoteCacheServer.java @@ -289,9 +289,9 @@ public class RemoteCacheServer<K, V> } } - final CacheEventQueueFactory<KK, VV> fact = new CacheEventQueueFactory<>(); - final ICacheEventQueue<KK, VV> q = fact.createCacheEventQueue( listener, id, cacheName, remoteCacheServerAttributes - .getEventQueuePoolName(), remoteCacheServerAttributes.getEventQueueType() ); + final ICacheEventQueue<KK, VV> q = CacheEventQueueFactory.createCacheEventQueue( + listener, id, cacheName, remoteCacheServerAttributes.getEventQueuePoolName(), + remoteCacheServerAttributes.getEventQueueType() ); eventQMap.put(Long.valueOf(listener.getListenerId()), q); diff --git a/commons-jcs4-core/src/main/java/org/apache/commons/jcs4/engine/CacheEventQueueFactory.java b/commons-jcs4-core/src/main/java/org/apache/commons/jcs4/engine/CacheEventQueueFactory.java index c65c581e..bd7f0b54 100644 --- a/commons-jcs4-core/src/main/java/org/apache/commons/jcs4/engine/CacheEventQueueFactory.java +++ b/commons-jcs4-core/src/main/java/org/apache/commons/jcs4/engine/CacheEventQueueFactory.java @@ -24,10 +24,9 @@ import org.apache.commons.jcs4.engine.behavior.ICacheListener; import org.apache.commons.jcs4.log.Log; /** - * This class hands out event Queues. This allows us to change the implementation more easily. You - * can configure the cache to use a custom type. + * This class hands out event Queues. This allows us to change the implementation more easily. */ -public class CacheEventQueueFactory<K, V> +public class CacheEventQueueFactory { /** The logger. */ private static final Log log = Log.getLog( CacheEventQueueFactory.class ); @@ -44,24 +43,27 @@ public class CacheEventQueueFactory<K, V> * @param poolType single or pooled * @return ICacheEventQueue */ - public ICacheEventQueue<K, V> createCacheEventQueue( final ICacheListener<K, V> listener, final long listenerId, final String cacheName, - final int maxFailure, final int waitBeforeRetry, final String threadPoolName, - final ICacheEventQueue.QueueType poolType ) + public static <K, V> ICacheEventQueue<K, V> createCacheEventQueue( + final ICacheListener<K, V> listener, final long listenerId, final String cacheName, + final int maxFailure, final int waitBeforeRetry, final String threadPoolName, + ICacheEventQueue.QueueType poolType ) { log.debug( "threadPoolName = [{0}] poolType = {1}", threadPoolName, poolType ); - ICacheEventQueue<K, V> eventQueue = null; - if ( poolType == null || ICacheEventQueue.QueueType.SINGLE == poolType ) + if (poolType == null) { - eventQueue = new CacheEventQueue<>( listener, listenerId, cacheName, maxFailure, waitBeforeRetry ); + poolType = ICacheEventQueue.QueueType.SINGLE; } - else if ( ICacheEventQueue.QueueType.POOLED == poolType ) + + switch (poolType) { - eventQueue = new PooledCacheEventQueue<>( listener, listenerId, cacheName, maxFailure, waitBeforeRetry, - threadPoolName ); - } + case POOLED: return new PooledCacheEventQueue<>(listener, listenerId, cacheName, + maxFailure, waitBeforeRetry, threadPoolName); - return eventQueue; + case SINGLE: + default: return new CacheEventQueue<>(listener, listenerId, cacheName, + maxFailure, waitBeforeRetry); + } } /** @@ -74,8 +76,9 @@ public class CacheEventQueueFactory<K, V> * @param poolType SINGLE, POOLED * @return ICacheEventQueue */ - public ICacheEventQueue<K, V> createCacheEventQueue( final ICacheListener<K, V> listener, final long listenerId, final String cacheName, - final String threadPoolName, final ICacheEventQueue.QueueType poolType ) + public static <K, V> ICacheEventQueue<K, V> createCacheEventQueue( + final ICacheListener<K, V> listener, final long listenerId, final String cacheName, + final String threadPoolName, final ICacheEventQueue.QueueType poolType ) { return createCacheEventQueue( listener, listenerId, cacheName, 10, 500, threadPoolName, poolType ); } diff --git a/commons-jcs4-core/src/test/java/org/apache/commons/jcs4/engine/CacheEventQueueFactoryUnitTest.java b/commons-jcs4-core/src/test/java/org/apache/commons/jcs4/engine/CacheEventQueueFactoryUnitTest.java index 417f443c..d2b1eff6 100644 --- a/commons-jcs4-core/src/test/java/org/apache/commons/jcs4/engine/CacheEventQueueFactoryUnitTest.java +++ b/commons-jcs4-core/src/test/java/org/apache/commons/jcs4/engine/CacheEventQueueFactoryUnitTest.java @@ -41,10 +41,9 @@ class CacheEventQueueFactoryUnitTest final ICacheListener<String, String> listener = new MockRemoteCacheListener<>(); final long listenerId = 1; - final CacheEventQueueFactory<String, String> factory = new CacheEventQueueFactory<>(); - // DO WORK - final ICacheEventQueue<String, String> result = factory.createCacheEventQueue( listener, listenerId, "cacheName", "threadPoolName", eventQueueType ); + final ICacheEventQueue<String, String> result = CacheEventQueueFactory.createCacheEventQueue( + listener, listenerId, "cacheName", "threadPoolName", eventQueueType ); // VERIFY assertNotNull( result, "Should have a result" ); @@ -60,10 +59,9 @@ class CacheEventQueueFactoryUnitTest final ICacheListener<String, String> listener = new MockRemoteCacheListener<>(); final long listenerId = 1; - final CacheEventQueueFactory<String, String> factory = new CacheEventQueueFactory<>(); - // DO WORK - final ICacheEventQueue<String, String> result = factory.createCacheEventQueue( listener, listenerId, "cacheName", "threadPoolName", eventQueueType ); + final ICacheEventQueue<String, String> result = CacheEventQueueFactory.createCacheEventQueue( + listener, listenerId, "cacheName", "threadPoolName", eventQueueType ); // VERIFY assertNotNull( result, "Should have a result" );
