This is an automated email from the ASF dual-hosted git repository. tv pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/commons-jcs.git
commit b8486dccdc7bec2314a9d7e0b3d445c34e5169ad Author: Thomas Vandahl <t...@apache.org> AuthorDate: Tue May 28 15:46:27 2019 +0200 Use lambdas for Runnables --- .../jcs/auxiliary/disk/AbstractDiskCache.java | 37 +++++----- .../jcs/auxiliary/disk/block/BlockDiskCache.java | 29 +------- .../auxiliary/disk/indexed/IndexedDiskCache.java | 78 ++++++++-------------- .../jcs/utils/discovery/UDPDiscoveryReceiver.java | 36 ++++------ .../jcs/utils/discovery/UDPDiscoveryService.java | 4 +- 5 files changed, 62 insertions(+), 122 deletions(-) diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs/auxiliary/disk/AbstractDiskCache.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs/auxiliary/disk/AbstractDiskCache.java index 6f42f20..a536876 100644 --- a/commons-jcs-core/src/main/java/org/apache/commons/jcs/auxiliary/disk/AbstractDiskCache.java +++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs/auxiliary/disk/AbstractDiskCache.java @@ -437,32 +437,27 @@ public abstract class AbstractDiskCache<K, V> public final void dispose() throws IOException { - Runnable disR = new Runnable() + Thread t = new Thread(() -> { - @Override - public void run() + boolean keepGoing = true; + // long total = 0; + long interval = 100; + while ( keepGoing ) { - boolean keepGoing = true; - // long total = 0; - long interval = 100; - while ( keepGoing ) + keepGoing = !cacheEventQueue.isEmpty(); + try { - keepGoing = !cacheEventQueue.isEmpty(); - try - { - Thread.sleep( interval ); - // total += interval; - // log.info( "total = " + total ); - } - catch ( InterruptedException e ) - { - break; - } + Thread.sleep( interval ); + // total += interval; + // log.info( "total = " + total ); + } + catch ( InterruptedException e ) + { + break; } - log.info( "No longer waiting for event queue to finish: " + cacheEventQueue.getStatistics() ); } - }; - Thread t = new Thread( disR ); + log.info( "No longer waiting for event queue to finish: " + cacheEventQueue.getStatistics() ); + }); t.start(); // wait up to 60 seconds for dispose and then quit if not done. try diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs/auxiliary/disk/block/BlockDiskCache.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs/auxiliary/disk/block/BlockDiskCache.java index 2eae22b..06f89ba 100644 --- a/commons-jcs-core/src/main/java/org/apache/commons/jcs/auxiliary/disk/block/BlockDiskCache.java +++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs/auxiliary/disk/block/BlockDiskCache.java @@ -171,15 +171,7 @@ public class BlockDiskCache<K, V> // TODO we might need to stagger this a bit. if ( this.blockDiskCacheAttributes.getKeyPersistenceIntervalSeconds() > 0 ) { - future = scheduledExecutor.scheduleAtFixedRate( - new Runnable() - { - @Override - public void run() - { - keyStore.saveKeys(); - } - }, + future = scheduledExecutor.scheduleAtFixedRate(keyStore::saveKeys, this.blockDiskCacheAttributes.getKeyPersistenceIntervalSeconds(), this.blockDiskCacheAttributes.getKeyPersistenceIntervalSeconds(), TimeUnit.SECONDS); @@ -574,22 +566,7 @@ public class BlockDiskCache<K, V> @Override public void processDispose() { - Runnable disR = new Runnable() - { - @Override - public void run() - { - try - { - disposeInternal(); - } - catch ( InterruptedException e ) - { - log.warn( "Interrupted while diposing." ); - } - } - }; - Thread t = new Thread( disR, "BlockDiskCache-DisposalThread" ); + Thread t = new Thread(this::disposeInternal, "BlockDiskCache-DisposalThread" ); t.start(); // wait up to 60 seconds for dispose and then quit if not done. try @@ -604,10 +581,8 @@ public class BlockDiskCache<K, V> /** * Internal method that handles the disposal. - * @throws InterruptedException */ protected void disposeInternal() - throws InterruptedException { if ( !isAlive() ) { diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs/auxiliary/disk/indexed/IndexedDiskCache.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs/auxiliary/disk/indexed/IndexedDiskCache.java index 825376e..c288784 100644 --- a/commons-jcs-core/src/main/java/org/apache/commons/jcs/auxiliary/disk/indexed/IndexedDiskCache.java +++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs/auxiliary/disk/indexed/IndexedDiskCache.java @@ -79,7 +79,7 @@ public class IndexedDiskCache<K, V> extends AbstractDiskCache<K, V> private IndexedDisk keyFile; /** Map containing the keys and disk offsets. */ - private Map<K, IndexedDiskElementDescriptor> keyHash; + private final Map<K, IndexedDiskElementDescriptor> keyHash; /** The maximum number of keys that we will keep in memory. */ private final int maxKeySize; @@ -112,11 +112,10 @@ public class IndexedDiskCache<K, V> extends AbstractDiskCache<K, V> private boolean queueInput = false; /** list where puts made during optimization are made */ - private final ConcurrentSkipListSet<IndexedDiskElementDescriptor> queuedPutList = - new ConcurrentSkipListSet<IndexedDiskElementDescriptor>(new PositionComparator()); + private final ConcurrentSkipListSet<IndexedDiskElementDescriptor> queuedPutList; /** RECYLCE BIN -- array of empty spots */ - private ConcurrentSkipListSet<IndexedDiskElementDescriptor> recycle; + private final ConcurrentSkipListSet<IndexedDiskElementDescriptor> recycle; /** User configurable parameters */ private final IndexedDiskCacheAttributes cattr; @@ -174,14 +173,15 @@ public class IndexedDiskCache<K, V> extends AbstractDiskCache<K, V> this.diskLimitType = cattr.getDiskLimitType(); // Make a clean file name this.fileName = getCacheName().replaceAll("[^a-zA-Z0-9-_\\.]", "_"); + this.keyHash = createInitialKeyMap(); + this.queuedPutList = new ConcurrentSkipListSet<>(new PositionComparator()); + this.recycle = new ConcurrentSkipListSet<>(); try { - initializeRecycleBin(); initializeFileSystem(cattr); initializeKeysAndData(cattr); - // Initialization finished successfully, so set alive to true. setAlive(true); if (log.isInfoEnabled()) @@ -263,7 +263,7 @@ public class IndexedDiskCache<K, V> extends AbstractDiskCache<K, V> */ private void initializeEmptyStore() throws IOException { - initializeKeyMap(); + this.keyHash.clear(); if (dataFile.length() > 0) { @@ -321,8 +321,8 @@ public class IndexedDiskCache<K, V> extends AbstractDiskCache<K, V> try { - // create a key map to use. - initializeKeyMap(); + // clear a key map to use. + keyHash.clear(); HashMap<K, IndexedDiskElementDescriptor> keys = keyFile.readObject( new IndexedDiskElementDescriptor(0, (int) keyFile.length() - IndexedDisk.HEADER_SIZE_BYTES)); @@ -961,13 +961,12 @@ public class IndexedDiskCache<K, V> extends AbstractDiskCache<K, V> dataFile = new IndexedDisk(new File(rafDir, fileName + ".data"), getElementSerializer()); keyFile = new IndexedDisk(new File(rafDir, fileName + ".key"), getElementSerializer()); - initializeRecycleBin(); - - initializeKeyMap(); + this.recycle.clear(); + this.keyHash.clear(); } catch (IOException e) { - log.error(logCacheName + "Failure reseting state", e); + log.error(logCacheName + "Failure resetting state", e); } finally { @@ -976,29 +975,22 @@ public class IndexedDiskCache<K, V> extends AbstractDiskCache<K, V> } /** - * If the maxKeySize is < 0, use 5000, no way to have an unlimited recycle bin right now, or one - * less than the mazKeySize. - */ - private void initializeRecycleBin() - { - recycle = new ConcurrentSkipListSet<IndexedDiskElementDescriptor>(); - } - - /** * Create the map for keys that contain the index position on disk. + * + * @return a new empty Map for keys and IndexedDiskElementDescriptors */ - private void initializeKeyMap() + private Map<K, IndexedDiskElementDescriptor> createInitialKeyMap() { - keyHash = null; + Map<K, IndexedDiskElementDescriptor> keyMap = null; if (maxKeySize >= 0) { if (this.diskLimitType == DiskLimitType.COUNT) { - keyHash = new LRUMapCountLimited(maxKeySize); + keyMap = new LRUMapCountLimited(maxKeySize); } else { - keyHash = new LRUMapSizeLimited(maxKeySize); + keyMap = new LRUMapSizeLimited(maxKeySize); } if (log.isInfoEnabled()) @@ -1009,13 +1001,15 @@ public class IndexedDiskCache<K, V> extends AbstractDiskCache<K, V> else { // If no max size, use a plain map for memory and processing efficiency. - keyHash = new HashMap<K, IndexedDiskElementDescriptor>(); + keyMap = new HashMap<>(); // keyHash = Collections.synchronizedMap( new HashMap() ); if (log.isInfoEnabled()) { log.info(logCacheName + "Set maxKeySize to unlimited'"); } } + + return keyMap; } /** @@ -1030,15 +1024,7 @@ public class IndexedDiskCache<K, V> extends AbstractDiskCache<K, V> ICacheEvent<String> cacheEvent = createICacheEvent(getCacheName(), "none", ICacheEventLogger.DISPOSE_EVENT); try { - Runnable disR = new Runnable() - { - @Override - public void run() - { - disposeInternal(); - } - }; - Thread t = new Thread(disR, "IndexedDiskCache-DisposalThread"); + Thread t = new Thread(this::disposeInternal, "IndexedDiskCache-DisposalThread"); t.start(); // wait up to 60 seconds for dispose and then quit if not done. try @@ -1076,7 +1062,7 @@ public class IndexedDiskCache<K, V> extends AbstractDiskCache<K, V> // Join with the current optimization thread. if (log.isDebugEnabled()) { - log.debug(logCacheName + "In dispose, optimization already " + "in progress; waiting for completion."); + log.debug(logCacheName + "In dispose, optimization already in progress; waiting for completion."); } try { @@ -1136,14 +1122,14 @@ public class IndexedDiskCache<K, V> extends AbstractDiskCache<K, V> try { - this.adjustBytesFree(ded, true); + adjustBytesFree(ded, true); if (doRecycle) { recycle.add(ded); if (log.isDebugEnabled()) { - log.debug(logCacheName + "recycled ded" + ded); + log.debug(logCacheName + "recycled ded " + ded); } } @@ -1178,15 +1164,9 @@ public class IndexedDiskCache<K, V> extends AbstractDiskCache<K, V> { if (currentOptimizationThread == null) { - currentOptimizationThread = new Thread(new Runnable() - { - @Override - public void run() - { - optimizeFile(); - - currentOptimizationThread = null; - } + currentOptimizationThread = new Thread(() -> { + optimizeFile(); + currentOptimizationThread = null; }, "IndexedDiskCache-OptimizationThread"); } } @@ -1279,7 +1259,7 @@ public class IndexedDiskCache<K, V> extends AbstractDiskCache<K, V> // RESTORE NORMAL OPERATION removeCount = 0; resetBytesFree(); - initializeRecycleBin(); + this.recycle.clear(); queuedPutList.clear(); queueInput = false; // turn recycle back on. diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs/utils/discovery/UDPDiscoveryReceiver.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs/utils/discovery/UDPDiscoveryReceiver.java index 0f5f510..b519159 100644 --- a/commons-jcs-core/src/main/java/org/apache/commons/jcs/utils/discovery/UDPDiscoveryReceiver.java +++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs/utils/discovery/UDPDiscoveryReceiver.java @@ -26,6 +26,7 @@ import java.net.DatagramPacket; import java.net.InetAddress; import java.net.MulticastSocket; import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.jcs.engine.CacheInfo; import org.apache.commons.jcs.engine.behavior.IShutdownObserver; @@ -57,19 +58,19 @@ public class UDPDiscoveryReceiver private static final int maxPoolSize = 2; /** The processor */ - private ExecutorService pooledExecutor = null; + private final ExecutorService pooledExecutor; /** number of messages received. For debugging and testing. */ - private int cnt = 0; + private AtomicInteger cnt = new AtomicInteger(0); /** Service to get cache names and handle request broadcasts */ - private UDPDiscoveryService service = null; + private final UDPDiscoveryService service; /** Address */ - private String multicastAddressString = ""; + private final String multicastAddressString; /** The port */ - private int multicastPort = 0; + private final int multicastPort; /** Is it shutdown. */ private boolean shutdown = false; @@ -92,7 +93,7 @@ public class UDPDiscoveryReceiver this.multicastPort = multicastPort; // create a small thread pool to handle a barrage - pooledExecutor = ThreadPoolManager.getInstance().createPool( + this.pooledExecutor = ThreadPoolManager.getInstance().createPool( new PoolConfiguration(false, 0, maxPoolSize, maxPoolSize, 0, WhenBlockedPolicy.DISCARDOLDEST, maxPoolSize), "JCS-UDPDiscoveryReceiver-", Thread.MIN_PRIORITY); @@ -101,16 +102,7 @@ public class UDPDiscoveryReceiver log.info( "Constructing listener, [" + this.multicastAddressString + ":" + this.multicastPort + "]" ); } - try - { - createSocket( this.multicastAddressString, this.multicastPort ); - } - catch ( IOException ioe ) - { - // consider eating this so we can go on, or constructing the socket - // later - throw ioe; - } + createSocket( this.multicastAddressString, this.multicastPort ); } /** @@ -165,9 +157,8 @@ public class UDPDiscoveryReceiver log.debug( "Received packet from address [" + packet.getSocketAddress() + "]" ); } - final ByteArrayInputStream byteStream = new ByteArrayInputStream( mBuffer, 0, packet.getLength() ); - - try (ObjectInputStream objectStream = new ObjectInputStreamClassLoaderAware( byteStream, null )) + try (ByteArrayInputStream byteStream = new ByteArrayInputStream(mBuffer, 0, packet.getLength()); + ObjectInputStream objectStream = new ObjectInputStreamClassLoaderAware(byteStream, null)) { obj = objectStream.readObject(); } @@ -204,8 +195,7 @@ public class UDPDiscoveryReceiver { Object obj = waitForMessage(); - // not thread safe, but just for debugging - cnt++; + cnt.incrementAndGet(); if ( log.isDebugEnabled() ) { @@ -261,7 +251,7 @@ public class UDPDiscoveryReceiver */ public void setCnt( int cnt ) { - this.cnt = cnt; + this.cnt.set(cnt); } /** @@ -269,7 +259,7 @@ public class UDPDiscoveryReceiver */ public int getCnt() { - return cnt; + return cnt.get(); } /** diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs/utils/discovery/UDPDiscoveryService.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs/utils/discovery/UDPDiscoveryService.java index b2047d7..15b635f 100644 --- a/commons-jcs-core/src/main/java/org/apache/commons/jcs/utils/discovery/UDPDiscoveryService.java +++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs/utils/discovery/UDPDiscoveryService.java @@ -300,8 +300,8 @@ public class UDPDiscoveryService */ public void startup() { - udpReceiverThread = new Thread( receiver ); - udpReceiverThread.setDaemon( true ); + udpReceiverThread = new Thread(receiver); + udpReceiverThread.setDaemon(true); // udpReceiverThread.setName( t.getName() + "--UDPReceiver" ); udpReceiverThread.start(); }