Author: tv Date: Fri May 24 09:12:04 2019 New Revision: 1859857 URL: http://svn.apache.org/viewvc?rev=1859857&view=rev Log: Use lambdas for Runnables
Modified: commons/proper/jcs/trunk/commons-jcs-core/src/main/java/org/apache/commons/jcs/auxiliary/disk/AbstractDiskCache.java commons/proper/jcs/trunk/commons-jcs-core/src/main/java/org/apache/commons/jcs/auxiliary/disk/block/BlockDiskCache.java commons/proper/jcs/trunk/commons-jcs-core/src/main/java/org/apache/commons/jcs/auxiliary/disk/indexed/IndexedDiskCache.java commons/proper/jcs/trunk/commons-jcs-core/src/main/java/org/apache/commons/jcs/utils/discovery/UDPDiscoveryReceiver.java commons/proper/jcs/trunk/commons-jcs-core/src/main/java/org/apache/commons/jcs/utils/discovery/UDPDiscoveryService.java Modified: commons/proper/jcs/trunk/commons-jcs-core/src/main/java/org/apache/commons/jcs/auxiliary/disk/AbstractDiskCache.java URL: http://svn.apache.org/viewvc/commons/proper/jcs/trunk/commons-jcs-core/src/main/java/org/apache/commons/jcs/auxiliary/disk/AbstractDiskCache.java?rev=1859857&r1=1859856&r2=1859857&view=diff ============================================================================== --- commons/proper/jcs/trunk/commons-jcs-core/src/main/java/org/apache/commons/jcs/auxiliary/disk/AbstractDiskCache.java (original) +++ commons/proper/jcs/trunk/commons-jcs-core/src/main/java/org/apache/commons/jcs/auxiliary/disk/AbstractDiskCache.java Fri May 24 09:12:04 2019 @@ -437,32 +437,27 @@ public abstract class AbstractDiskCache< public final void dispose() throws IOException { - Runnable disR = new Runnable() + Thread t = new Thread(() -> { - @Override - public void run() + boolean keepGoing = true; + // long total = 0; + long interval = 100; + while ( keepGoing ) { - boolean keepGoing = true; - // long total = 0; - long interval = 100; - while ( keepGoing ) + keepGoing = !cacheEventQueue.isEmpty(); + try { - keepGoing = !cacheEventQueue.isEmpty(); - try - { - Thread.sleep( interval ); - // total += interval; - // log.info( "total = " + total ); - } - catch ( InterruptedException e ) - { - break; - } + Thread.sleep( interval ); + // total += interval; + // log.info( "total = " + total ); + } + catch ( InterruptedException e ) + { + break; } - log.info( "No longer waiting for event queue to finish: " + cacheEventQueue.getStatistics() ); } - }; - Thread t = new Thread( disR ); + log.info( "No longer waiting for event queue to finish: " + cacheEventQueue.getStatistics() ); + }); t.start(); // wait up to 60 seconds for dispose and then quit if not done. try Modified: commons/proper/jcs/trunk/commons-jcs-core/src/main/java/org/apache/commons/jcs/auxiliary/disk/block/BlockDiskCache.java URL: http://svn.apache.org/viewvc/commons/proper/jcs/trunk/commons-jcs-core/src/main/java/org/apache/commons/jcs/auxiliary/disk/block/BlockDiskCache.java?rev=1859857&r1=1859856&r2=1859857&view=diff ============================================================================== --- commons/proper/jcs/trunk/commons-jcs-core/src/main/java/org/apache/commons/jcs/auxiliary/disk/block/BlockDiskCache.java (original) +++ commons/proper/jcs/trunk/commons-jcs-core/src/main/java/org/apache/commons/jcs/auxiliary/disk/block/BlockDiskCache.java Fri May 24 09:12:04 2019 @@ -171,15 +171,7 @@ public class BlockDiskCache<K, V> // TODO we might need to stagger this a bit. if ( this.blockDiskCacheAttributes.getKeyPersistenceIntervalSeconds() > 0 ) { - future = scheduledExecutor.scheduleAtFixedRate( - new Runnable() - { - @Override - public void run() - { - keyStore.saveKeys(); - } - }, + future = scheduledExecutor.scheduleAtFixedRate(keyStore::saveKeys, this.blockDiskCacheAttributes.getKeyPersistenceIntervalSeconds(), this.blockDiskCacheAttributes.getKeyPersistenceIntervalSeconds(), TimeUnit.SECONDS); @@ -574,22 +566,7 @@ public class BlockDiskCache<K, V> @Override public void processDispose() { - Runnable disR = new Runnable() - { - @Override - public void run() - { - try - { - disposeInternal(); - } - catch ( InterruptedException e ) - { - log.warn( "Interrupted while diposing." ); - } - } - }; - Thread t = new Thread( disR, "BlockDiskCache-DisposalThread" ); + Thread t = new Thread(this::disposeInternal, "BlockDiskCache-DisposalThread" ); t.start(); // wait up to 60 seconds for dispose and then quit if not done. try @@ -604,10 +581,8 @@ public class BlockDiskCache<K, V> /** * Internal method that handles the disposal. - * @throws InterruptedException */ protected void disposeInternal() - throws InterruptedException { if ( !isAlive() ) { Modified: commons/proper/jcs/trunk/commons-jcs-core/src/main/java/org/apache/commons/jcs/auxiliary/disk/indexed/IndexedDiskCache.java URL: http://svn.apache.org/viewvc/commons/proper/jcs/trunk/commons-jcs-core/src/main/java/org/apache/commons/jcs/auxiliary/disk/indexed/IndexedDiskCache.java?rev=1859857&r1=1859856&r2=1859857&view=diff ============================================================================== --- commons/proper/jcs/trunk/commons-jcs-core/src/main/java/org/apache/commons/jcs/auxiliary/disk/indexed/IndexedDiskCache.java (original) +++ commons/proper/jcs/trunk/commons-jcs-core/src/main/java/org/apache/commons/jcs/auxiliary/disk/indexed/IndexedDiskCache.java Fri May 24 09:12:04 2019 @@ -79,7 +79,7 @@ public class IndexedDiskCache<K, V> exte private IndexedDisk keyFile; /** Map containing the keys and disk offsets. */ - private Map<K, IndexedDiskElementDescriptor> keyHash; + private final Map<K, IndexedDiskElementDescriptor> keyHash; /** The maximum number of keys that we will keep in memory. */ private final int maxKeySize; @@ -112,11 +112,10 @@ public class IndexedDiskCache<K, V> exte private boolean queueInput = false; /** list where puts made during optimization are made */ - private final ConcurrentSkipListSet<IndexedDiskElementDescriptor> queuedPutList = - new ConcurrentSkipListSet<IndexedDiskElementDescriptor>(new PositionComparator()); + private final ConcurrentSkipListSet<IndexedDiskElementDescriptor> queuedPutList; /** RECYLCE BIN -- array of empty spots */ - private ConcurrentSkipListSet<IndexedDiskElementDescriptor> recycle; + private final ConcurrentSkipListSet<IndexedDiskElementDescriptor> recycle; /** User configurable parameters */ private final IndexedDiskCacheAttributes cattr; @@ -174,14 +173,15 @@ public class IndexedDiskCache<K, V> exte this.diskLimitType = cattr.getDiskLimitType(); // Make a clean file name this.fileName = getCacheName().replaceAll("[^a-zA-Z0-9-_\\.]", "_"); + this.keyHash = createInitialKeyMap(); + this.queuedPutList = new ConcurrentSkipListSet<>(new PositionComparator()); + this.recycle = new ConcurrentSkipListSet<>(); try { - initializeRecycleBin(); initializeFileSystem(cattr); initializeKeysAndData(cattr); - // Initialization finished successfully, so set alive to true. setAlive(true); if (log.isInfoEnabled()) @@ -263,7 +263,7 @@ public class IndexedDiskCache<K, V> exte */ private void initializeEmptyStore() throws IOException { - initializeKeyMap(); + this.keyHash.clear(); if (dataFile.length() > 0) { @@ -321,8 +321,8 @@ public class IndexedDiskCache<K, V> exte try { - // create a key map to use. - initializeKeyMap(); + // clear a key map to use. + keyHash.clear(); HashMap<K, IndexedDiskElementDescriptor> keys = keyFile.readObject( new IndexedDiskElementDescriptor(0, (int) keyFile.length() - IndexedDisk.HEADER_SIZE_BYTES)); @@ -961,13 +961,12 @@ public class IndexedDiskCache<K, V> exte dataFile = new IndexedDisk(new File(rafDir, fileName + ".data"), getElementSerializer()); keyFile = new IndexedDisk(new File(rafDir, fileName + ".key"), getElementSerializer()); - initializeRecycleBin(); - - initializeKeyMap(); + this.recycle.clear(); + this.keyHash.clear(); } catch (IOException e) { - log.error(logCacheName + "Failure reseting state", e); + log.error(logCacheName + "Failure resetting state", e); } finally { @@ -976,29 +975,22 @@ public class IndexedDiskCache<K, V> exte } /** - * If the maxKeySize is < 0, use 5000, no way to have an unlimited recycle bin right now, or one - * less than the mazKeySize. - */ - private void initializeRecycleBin() - { - recycle = new ConcurrentSkipListSet<IndexedDiskElementDescriptor>(); - } - - /** * Create the map for keys that contain the index position on disk. + * + * @return a new empty Map for keys and IndexedDiskElementDescriptors */ - private void initializeKeyMap() + private Map<K, IndexedDiskElementDescriptor> createInitialKeyMap() { - keyHash = null; + Map<K, IndexedDiskElementDescriptor> keyMap = null; if (maxKeySize >= 0) { if (this.diskLimitType == DiskLimitType.COUNT) { - keyHash = new LRUMapCountLimited(maxKeySize); + keyMap = new LRUMapCountLimited(maxKeySize); } else { - keyHash = new LRUMapSizeLimited(maxKeySize); + keyMap = new LRUMapSizeLimited(maxKeySize); } if (log.isInfoEnabled()) @@ -1009,13 +1001,15 @@ public class IndexedDiskCache<K, V> exte else { // If no max size, use a plain map for memory and processing efficiency. - keyHash = new HashMap<K, IndexedDiskElementDescriptor>(); + keyMap = new HashMap<>(); // keyHash = Collections.synchronizedMap( new HashMap() ); if (log.isInfoEnabled()) { log.info(logCacheName + "Set maxKeySize to unlimited'"); } } + + return keyMap; } /** @@ -1030,15 +1024,7 @@ public class IndexedDiskCache<K, V> exte ICacheEvent<String> cacheEvent = createICacheEvent(getCacheName(), "none", ICacheEventLogger.DISPOSE_EVENT); try { - Runnable disR = new Runnable() - { - @Override - public void run() - { - disposeInternal(); - } - }; - Thread t = new Thread(disR, "IndexedDiskCache-DisposalThread"); + Thread t = new Thread(this::disposeInternal, "IndexedDiskCache-DisposalThread"); t.start(); // wait up to 60 seconds for dispose and then quit if not done. try @@ -1076,7 +1062,7 @@ public class IndexedDiskCache<K, V> exte // Join with the current optimization thread. if (log.isDebugEnabled()) { - log.debug(logCacheName + "In dispose, optimization already " + "in progress; waiting for completion."); + log.debug(logCacheName + "In dispose, optimization already in progress; waiting for completion."); } try { @@ -1136,14 +1122,14 @@ public class IndexedDiskCache<K, V> exte try { - this.adjustBytesFree(ded, true); + adjustBytesFree(ded, true); if (doRecycle) { recycle.add(ded); if (log.isDebugEnabled()) { - log.debug(logCacheName + "recycled ded" + ded); + log.debug(logCacheName + "recycled ded " + ded); } } @@ -1178,15 +1164,9 @@ public class IndexedDiskCache<K, V> exte { if (currentOptimizationThread == null) { - currentOptimizationThread = new Thread(new Runnable() - { - @Override - public void run() - { - optimizeFile(); - - currentOptimizationThread = null; - } + currentOptimizationThread = new Thread(() -> { + optimizeFile(); + currentOptimizationThread = null; }, "IndexedDiskCache-OptimizationThread"); } } @@ -1279,7 +1259,7 @@ public class IndexedDiskCache<K, V> exte // RESTORE NORMAL OPERATION removeCount = 0; resetBytesFree(); - initializeRecycleBin(); + this.recycle.clear(); queuedPutList.clear(); queueInput = false; // turn recycle back on. Modified: commons/proper/jcs/trunk/commons-jcs-core/src/main/java/org/apache/commons/jcs/utils/discovery/UDPDiscoveryReceiver.java URL: http://svn.apache.org/viewvc/commons/proper/jcs/trunk/commons-jcs-core/src/main/java/org/apache/commons/jcs/utils/discovery/UDPDiscoveryReceiver.java?rev=1859857&r1=1859856&r2=1859857&view=diff ============================================================================== --- commons/proper/jcs/trunk/commons-jcs-core/src/main/java/org/apache/commons/jcs/utils/discovery/UDPDiscoveryReceiver.java (original) +++ commons/proper/jcs/trunk/commons-jcs-core/src/main/java/org/apache/commons/jcs/utils/discovery/UDPDiscoveryReceiver.java Fri May 24 09:12:04 2019 @@ -26,6 +26,7 @@ import java.net.DatagramPacket; import java.net.InetAddress; import java.net.MulticastSocket; import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.jcs.engine.CacheInfo; import org.apache.commons.jcs.engine.behavior.IShutdownObserver; @@ -57,19 +58,19 @@ public class UDPDiscoveryReceiver private static final int maxPoolSize = 2; /** The processor */ - private ExecutorService pooledExecutor = null; + private final ExecutorService pooledExecutor; /** number of messages received. For debugging and testing. */ - private int cnt = 0; + private AtomicInteger cnt = new AtomicInteger(0); /** Service to get cache names and handle request broadcasts */ - private UDPDiscoveryService service = null; + private final UDPDiscoveryService service; /** Address */ - private String multicastAddressString = ""; + private final String multicastAddressString; /** The port */ - private int multicastPort = 0; + private final int multicastPort; /** Is it shutdown. */ private boolean shutdown = false; @@ -92,7 +93,7 @@ public class UDPDiscoveryReceiver this.multicastPort = multicastPort; // create a small thread pool to handle a barrage - pooledExecutor = ThreadPoolManager.getInstance().createPool( + this.pooledExecutor = ThreadPoolManager.getInstance().createPool( new PoolConfiguration(false, 0, maxPoolSize, maxPoolSize, 0, WhenBlockedPolicy.DISCARDOLDEST, maxPoolSize), "JCS-UDPDiscoveryReceiver-", Thread.MIN_PRIORITY); @@ -101,16 +102,7 @@ public class UDPDiscoveryReceiver log.info( "Constructing listener, [" + this.multicastAddressString + ":" + this.multicastPort + "]" ); } - try - { - createSocket( this.multicastAddressString, this.multicastPort ); - } - catch ( IOException ioe ) - { - // consider eating this so we can go on, or constructing the socket - // later - throw ioe; - } + createSocket( this.multicastAddressString, this.multicastPort ); } /** @@ -165,9 +157,8 @@ public class UDPDiscoveryReceiver log.debug( "Received packet from address [" + packet.getSocketAddress() + "]" ); } - final ByteArrayInputStream byteStream = new ByteArrayInputStream( mBuffer, 0, packet.getLength() ); - - try (ObjectInputStream objectStream = new ObjectInputStreamClassLoaderAware( byteStream, null )) + try (ByteArrayInputStream byteStream = new ByteArrayInputStream(mBuffer, 0, packet.getLength()); + ObjectInputStream objectStream = new ObjectInputStreamClassLoaderAware(byteStream, null)) { obj = objectStream.readObject(); } @@ -204,8 +195,7 @@ public class UDPDiscoveryReceiver { Object obj = waitForMessage(); - // not thread safe, but just for debugging - cnt++; + cnt.incrementAndGet(); if ( log.isDebugEnabled() ) { @@ -261,7 +251,7 @@ public class UDPDiscoveryReceiver */ public void setCnt( int cnt ) { - this.cnt = cnt; + this.cnt.set(cnt); } /** @@ -269,7 +259,7 @@ public class UDPDiscoveryReceiver */ public int getCnt() { - return cnt; + return cnt.get(); } /** Modified: commons/proper/jcs/trunk/commons-jcs-core/src/main/java/org/apache/commons/jcs/utils/discovery/UDPDiscoveryService.java URL: http://svn.apache.org/viewvc/commons/proper/jcs/trunk/commons-jcs-core/src/main/java/org/apache/commons/jcs/utils/discovery/UDPDiscoveryService.java?rev=1859857&r1=1859856&r2=1859857&view=diff ============================================================================== --- commons/proper/jcs/trunk/commons-jcs-core/src/main/java/org/apache/commons/jcs/utils/discovery/UDPDiscoveryService.java (original) +++ commons/proper/jcs/trunk/commons-jcs-core/src/main/java/org/apache/commons/jcs/utils/discovery/UDPDiscoveryService.java Fri May 24 09:12:04 2019 @@ -300,8 +300,8 @@ public class UDPDiscoveryService */ public void startup() { - udpReceiverThread = new Thread( receiver ); - udpReceiverThread.setDaemon( true ); + udpReceiverThread = new Thread(receiver); + udpReceiverThread.setDaemon(true); // udpReceiverThread.setName( t.getName() + "--UDPReceiver" ); udpReceiverThread.start(); }