This is an automated email from the ASF dual-hosted git repository. tv pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/commons-jcs.git
The following commit(s) were added to refs/heads/master by this push: new 3ef95e5 Use Lambdas instead of Runnables, deprecate old inner classes 3ef95e5 is described below commit 3ef95e540e4708b816aed38940eba8cec08a2a23 Author: Thomas Vandahl <t...@apache.org> AuthorDate: Sun Jan 24 19:26:39 2021 +0100 Use Lambdas instead of Runnables, deprecate old inner classes --- .../lateral/socket/tcp/LateralTCPListener.java | 283 ++++++++++----------- .../jcs3/engine/AbstractCacheEventQueue.java | 37 ++- .../engine/control/event/ElementEventQueue.java | 68 +---- .../jcs3/utils/discovery/UDPCleanupRunner.java | 34 +-- .../jcs3/utils/discovery/UDPDiscoveryReceiver.java | 104 ++++---- 5 files changed, 232 insertions(+), 294 deletions(-) diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPListener.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPListener.java index 3045253..8f457f8 100644 --- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPListener.java +++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPListener.java @@ -19,7 +19,6 @@ package org.apache.commons.jcs3.auxiliary.lateral.socket.tcp; * under the License. */ -import java.io.EOFException; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; @@ -29,7 +28,6 @@ import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; import java.net.SocketAddress; -import java.net.SocketException; import java.net.SocketTimeoutException; import java.util.Map; import java.util.Set; @@ -72,9 +70,6 @@ public class LateralTCPListener<K, V> private static final ConcurrentHashMap<String, ILateralCacheListener<?, ?>> instances = new ConcurrentHashMap<>(); - /** The socket listener */ - private ListenerThread receiver; - /** Configuration attributes */ private ITCPLateralCacheAttributes tcpLateralCacheAttributes; @@ -122,8 +117,7 @@ public class LateralTCPListener<K, V> newIns.init(); newIns.setCacheManager( cacheMgr ); - log.info( "Created new listener {0}", - () -> ilca.getTcpListenerPort() ); + log.info("Created new listener {0}", () -> ilca.getTcpListenerPort()); return newIns; }); @@ -176,9 +170,7 @@ public class LateralTCPListener<K, V> } serverSocket.setSoTimeout( acceptTimeOut ); - receiver = new ListenerThread(serverSocket); - receiver.setDaemon( true ); - receiver.start(); + pooledExecutor.execute(() -> runListener(serverSocket)); } catch ( final IOException ex ) { @@ -444,7 +436,9 @@ public class LateralTCPListener<K, V> /** * Processes commands from the server socket. There should be one listener for each configured * TCP lateral. + * @deprecated No longer used */ + @Deprecated public class ListenerThread extends Thread { @@ -462,60 +456,62 @@ public class LateralTCPListener<K, V> } /** Main processing method for the ListenerThread object */ - @SuppressWarnings("synthetic-access") @Override public void run() { - try (ServerSocket ssck = serverSocket) + runListener(serverSocket); + } + } + + /** + * Processes commands from the server socket. There should be one listener for each configured + * TCP lateral. + */ + private void runListener(final ServerSocket serverSocket) + { + try + { + while ( true ) { - ConnectionHandler handler; + log.debug( "Waiting for clients to connect " ); - outer: while ( true ) + // Check to see if we've been asked to exit, and exit + if (terminated.get()) { - log.debug( "Waiting for clients to connect " ); - - Socket socket = null; - inner: while (true) - { - // Check to see if we've been asked to exit, and exit - if (terminated.get()) - { - log.debug("Thread terminated, exiting gracefully"); - break outer; - } + log.debug("Thread terminated, exiting gracefully"); + break; + } - try - { - socket = ssck.accept(); - break inner; - } - catch (final SocketTimeoutException e) - { - // No problem! We loop back up! - continue inner; - } - } + try + { + final Socket socket = serverSocket.accept(); - if ( socket != null && log.isDebugEnabled() ) + if (socket != null) { - final InetAddress inetAddress = socket.getInetAddress(); - log.debug( "Connected to client at {0}", inetAddress ); + log.debug("Connected to client at {0}", () -> socket.getInetAddress()); } - handler = new ConnectionHandler( socket ); - pooledExecutor.execute( handler ); + pooledExecutor.execute(() -> handleConnection(socket)); + } + catch (final SocketTimeoutException e) + { + // No problem! We loop back up! } } - catch ( final IOException e ) - { - log.error( "Exception caught in TCP listener", e ); - } + + serverSocket.close(); + } + catch ( final IOException e ) + { + log.error( "Exception caught in TCP listener", e ); } } /** * A Separate thread that runs when a command comes into the LateralTCPReceiver. + * @deprecated No longer used */ + @Deprecated public class ConnectionHandler implements Runnable { @@ -535,125 +531,127 @@ public class LateralTCPListener<K, V> * Main processing method for the LateralTCPReceiverConnection object */ @Override - @SuppressWarnings({"unchecked", // Need to cast from Object - "synthetic-access" }) public void run() { - try (ObjectInputStream ois = - new ObjectInputStreamClassLoaderAware( socket.getInputStream(), null )) + handleConnection(socket); + } + } + + /** + * A Separate thread that runs when a command comes into the LateralTCPReceiver. + */ + private void handleConnection(final Socket socket) + { + try (ObjectInputStream ois = + new ObjectInputStreamClassLoaderAware( socket.getInputStream(), null )) + { + while ( true ) { - while ( true ) - { - final LateralElementDescriptor<K, V> led = - (LateralElementDescriptor<K, V>) ois.readObject(); + @SuppressWarnings("unchecked") // Need to cast from Object + final LateralElementDescriptor<K, V> led = + (LateralElementDescriptor<K, V>) ois.readObject(); - if ( led == null ) - { - log.debug( "LateralElementDescriptor is null" ); - continue; - } - if ( led.requesterId == getListenerId() ) - { - log.debug( "from self" ); - } - else - { - log.debug( "receiving LateralElementDescriptor from another led = {0}", - led ); + if ( led == null ) + { + log.debug( "LateralElementDescriptor is null" ); + continue; + } + if ( led.requesterId == getListenerId() ) + { + log.debug( "from self" ); + } + else + { + log.debug( "receiving LateralElementDescriptor from another led = {0}", + led ); - handle( led ); - } + handleElement(led, socket); } } - catch ( final EOFException e ) - { - log.info( "Caught EOFException, closing connection.", e ); - } - catch ( final SocketException e ) - { - log.info( "Caught SocketException, closing connection.", e ); - } - catch ( final Exception e ) - { - log.error( "Unexpected exception.", e ); - } } - - /** - * This calls the appropriate method, based on the command sent in the Lateral element - * descriptor. - * <p> - * @param led - * @throws IOException - */ - @SuppressWarnings("synthetic-access") - private void handle( final LateralElementDescriptor<K, V> led ) - throws IOException + catch (final IOException e) { - final String cacheName = led.ce.getCacheName(); - final K key = led.ce.getKey(); - Serializable obj = null; + log.info("Caught {0}, closing connection.", e.getClass().getSimpleName(), e); + } + catch (final ClassNotFoundException e) + { + log.error( "Deserialization failed reading from socket", e ); + } + } - switch (led.command) - { - case UPDATE: - handlePut( led.ce ); - break; + /** + * This calls the appropriate method, based on the command sent in the Lateral element + * descriptor. + * <p> + * @param led the lateral element + * @param socket the socket + * @throws IOException + */ + private void handleElement(final LateralElementDescriptor<K, V> led, Socket socket) throws IOException + { + final String cacheName = led.ce.getCacheName(); + final K key = led.ce.getKey(); + Serializable obj = null; - case REMOVE: - // if a hashcode was given and filtering is on - // check to see if they are the same - // if so, then don't remove, otherwise issue a remove - if ( led.valHashCode != -1 ) + switch (led.command) + { + case UPDATE: + handlePut( led.ce ); + break; + + case REMOVE: + // if a hashcode was given and filtering is on + // check to see if they are the same + // if so, then don't remove, otherwise issue a remove + if ( led.valHashCode != -1 ) + { + if ( getTcpLateralCacheAttributes().isFilterRemoveByHashCode() ) { - if ( getTcpLateralCacheAttributes().isFilterRemoveByHashCode() ) + final ICacheElement<K, V> test = getCache( cacheName ).localGet( key ); + if ( test != null ) { - final ICacheElement<K, V> test = getCache( cacheName ).localGet( key ); - if ( test != null ) + if ( test.getVal().hashCode() == led.valHashCode ) + { + log.debug( "Filtering detected identical hashCode [{0}], " + + "not issuing a remove for led {1}", + led.valHashCode, led ); + return; + } + else { - if ( test.getVal().hashCode() == led.valHashCode ) - { - log.debug( "Filtering detected identical hashCode [{0}], " - + "not issuing a remove for led {1}", - led.valHashCode, led ); - return; - } - else - { - log.debug( "Different hashcodes, in cache [{0}] sent [{1}]", - test.getVal().hashCode(), led.valHashCode ); - } + log.debug( "Different hashcodes, in cache [{0}] sent [{1}]", + test.getVal().hashCode(), led.valHashCode ); } } } - handleRemove( cacheName, key ); - break; + } + handleRemove( cacheName, key ); + break; - case REMOVEALL: - handleRemoveAll( cacheName ); - break; + case REMOVEALL: + handleRemoveAll( cacheName ); + break; - case GET: - obj = handleGet( cacheName, key ); - break; + case GET: + obj = handleGet( cacheName, key ); + break; - case GET_MATCHING: - obj = (Serializable) handleGetMatching( cacheName, (String) key ); - break; + case GET_MATCHING: + obj = (Serializable) handleGetMatching( cacheName, (String) key ); + break; - case GET_KEYSET: - obj = (Serializable) handleGetKeySet(cacheName); - break; + case GET_KEYSET: + obj = (Serializable) handleGetKeySet(cacheName); + break; - default: break; - } + default: break; + } - if (obj != null) - { - final ObjectOutputStream oos = new ObjectOutputStream( socket.getOutputStream() ); - oos.writeObject( obj ); - oos.flush(); - } + if (obj != null) + { + final ObjectOutputStream oos = new ObjectOutputStream( socket.getOutputStream() ); + oos.writeObject( obj ); + oos.flush(); } } @@ -666,8 +664,7 @@ public class LateralTCPListener<K, V> if ( shutdown.compareAndSet(false, true) ) { log.info( "Shutting down TCP Lateral receiver." ); - - receiver.interrupt(); + pooledExecutor.shutdownNow(); } else { diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/engine/AbstractCacheEventQueue.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/engine/AbstractCacheEventQueue.java index 6055a82..f131313 100644 --- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/engine/AbstractCacheEventQueue.java +++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/engine/AbstractCacheEventQueue.java @@ -205,45 +205,40 @@ public abstract class AbstractCacheEventQueue<K, V> */ protected abstract class AbstractCacheEvent implements Runnable { - /** Number of failures encountered processing this event. */ - int failures; - /** * Main processing method for the AbstractCacheEvent object */ @Override - @SuppressWarnings("synthetic-access") public void run() { - try - { - doRun(); - } - catch ( final IOException e ) + for (int failures = 0; failures < maxFailure; failures++) { - log.warn( e ); - if ( ++failures >= maxFailure ) + try { - log.warn( "Error while running event from Queue: {0}. " - + "Dropping Event and marking Event Queue as " - + "non-functional.", this ); - destroy(); + doRun(); return; } - log.info( "Error while running event from Queue: {0}. " - + "Retrying...", this ); + catch (final IOException e) + { + log.warn("Error while running event from Queue: {0}. " + + "Retrying...", this, e); + } + try { Thread.sleep( waitBeforeRetry ); - run(); } catch ( final InterruptedException ie ) { - log.warn( "Interrupted while sleeping for retry on event " - + "{0}.", this ); - destroy(); + log.warn("Interrupted while sleeping for retry on event " + + "{0}.", this, ie); + break; } } + + log.warn( "Dropping Event and marking Event Queue {0} as " + + "non-functional.", this ); + destroy(); } /** diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/engine/control/event/ElementEventQueue.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/engine/control/event/ElementEventQueue.java index e59c611..fd85be2 100644 --- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/engine/control/event/ElementEventQueue.java +++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/engine/control/event/ElementEventQueue.java @@ -21,6 +21,7 @@ package org.apache.commons.jcs3.engine.control.event; import java.io.IOException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.jcs3.engine.control.event.behavior.IElementEvent; import org.apache.commons.jcs3.engine.control.event.behavior.IElementEventHandler; @@ -28,8 +29,8 @@ import org.apache.commons.jcs3.engine.control.event.behavior.IElementEventQueue; import org.apache.commons.jcs3.log.Log; import org.apache.commons.jcs3.log.LogManager; import org.apache.commons.jcs3.utils.threadpool.PoolConfiguration; -import org.apache.commons.jcs3.utils.threadpool.ThreadPoolManager; import org.apache.commons.jcs3.utils.threadpool.PoolConfiguration.WhenBlockedPolicy; +import org.apache.commons.jcs3.utils.threadpool.ThreadPoolManager; /** * An event queue is used to propagate ordered cache events to one and only one target listener. @@ -43,7 +44,7 @@ public class ElementEventQueue private static final Log log = LogManager.getLog( ElementEventQueue.class ); /** shutdown or not */ - private boolean destroyed; + private AtomicBoolean destroyed = new AtomicBoolean(false); /** The worker thread pool. */ private ExecutorService queueProcessor; @@ -65,15 +66,10 @@ public class ElementEventQueue @Override public void dispose() { - if ( !destroyed ) + if (destroyed.compareAndSet(false, true)) { - destroyed = true; - - // synchronize on queue so the thread will not wait forever, - // and then interrupt the QueueProcessor + // shut down the QueueProcessor queueProcessor.shutdownNow(); - queueProcessor = null; - log.info( "Element event queue destroyed: {0}", this ); } } @@ -89,19 +85,15 @@ public class ElementEventQueue throws IOException { - log.debug( "Adding Event Handler to QUEUE, !destroyed = {0}", !destroyed ); + log.debug("Adding Event Handler to QUEUE, !destroyed = {0}", !destroyed.get()); - if (destroyed) + if (destroyed.get()) { log.warn("Event submitted to disposed element event queue {0}", event); } else { - final ElementEventRunner runner = new ElementEventRunner( hand, event ); - - log.debug( "runner = {0}", runner ); - - queueProcessor.execute(runner); + queueProcessor.execute(() -> hand.handleElementEvent(event)); } } @@ -109,14 +101,15 @@ public class ElementEventQueue /** * Retries before declaring failure. + * @deprecated No longer used */ + @Deprecated protected abstract class AbstractElementEventRunner implements Runnable { /** * Main processing method for the AbstractElementEvent object */ - @SuppressWarnings("synthetic-access") @Override public void run() { @@ -140,45 +133,4 @@ public class ElementEventQueue protected abstract void doRun() throws IOException; } - - /** - * ElementEventRunner. - */ - private class ElementEventRunner - extends AbstractElementEventRunner - { - /** the handler */ - private final IElementEventHandler hand; - - /** event */ - private final IElementEvent<?> event; - - /** - * Constructor for the PutEvent object. - * <p> - * @param hand - * @param event - * @throws IOException - */ - @SuppressWarnings("synthetic-access") - ElementEventRunner( final IElementEventHandler hand, final IElementEvent<?> event ) - throws IOException - { - log.debug( "Constructing {0}", this ); - this.hand = hand; - this.event = event; - } - - /** - * Tells the handler to handle the event. - * <p> - * @throws IOException - */ - @Override - protected void doRun() - throws IOException - { - hand.handleElementEvent( event ); - } - } } diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPCleanupRunner.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPCleanupRunner.java index e94f5ac..647dc2d 100644 --- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPCleanupRunner.java +++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPCleanupRunner.java @@ -19,9 +19,6 @@ package org.apache.commons.jcs3.utils.discovery; * under the License. */ -import java.util.HashSet; -import java.util.Set; - import org.apache.commons.jcs3.log.Log; import org.apache.commons.jcs3.log.LogManager; @@ -66,27 +63,20 @@ public class UDPCleanupRunner { final long now = System.currentTimeMillis(); - // iterate through the set - // it is thread safe - // TODO this should get a copy. you can't simply remove from this. // the listeners need to be notified. - final Set<DiscoveredService> toRemove = new HashSet<>(); - // can't remove via the iterator. must remove directly - for (final DiscoveredService service : discoveryService.getDiscoveredServices()) - { - if ( ( now - service.getLastHearFromTime() ) > ( maxIdleTimeSeconds * 1000 ) ) - { - log.info( "Removing service, since we haven't heard from it in " - + "{0} seconds. service = {1}", maxIdleTimeSeconds, service ); - toRemove.add( service ); - } - } + discoveryService.getDiscoveredServices().stream() + .filter(service -> { + if (now - service.getLastHearFromTime() > maxIdleTimeSeconds * 1000) + { + log.info( "Removing service, since we haven't heard from it in " + + "{0} seconds. service = {1}", maxIdleTimeSeconds, service ); + return true; + } - // remove the bad ones - for (final DiscoveredService service : toRemove) - { + return false; + }) + // remove the bad ones // call this so the listeners get notified - discoveryService.removeDiscoveredService( service ); - } + .forEach(service -> discoveryService.removeDiscoveredService(service)); } } diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryReceiver.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryReceiver.java index 3f03ca5..5e71d63 100644 --- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryReceiver.java +++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryReceiver.java @@ -37,8 +37,8 @@ import org.apache.commons.jcs3.log.LogManager; import org.apache.commons.jcs3.utils.discovery.UDPDiscoveryMessage.BroadcastType; import org.apache.commons.jcs3.utils.net.HostNameUtil; import org.apache.commons.jcs3.utils.threadpool.PoolConfiguration; -import org.apache.commons.jcs3.utils.threadpool.ThreadPoolManager; import org.apache.commons.jcs3.utils.threadpool.PoolConfiguration.WhenBlockedPolicy; +import org.apache.commons.jcs3.utils.threadpool.ThreadPoolManager; /** Receives UDP Discovery messages. */ public class UDPDiscoveryReceiver @@ -210,18 +210,13 @@ public class UDPDiscoveryReceiver log.debug( "{0} messages received.", this::getCnt ); - UDPDiscoveryMessage message = null; - try { - message = (UDPDiscoveryMessage) obj; + UDPDiscoveryMessage message = (UDPDiscoveryMessage) obj; // check for null if ( message != null ) { - final MessageHandler handler = new MessageHandler( message ); - - pooledExecutor.execute( handler ); - + pooledExecutor.execute(() -> handleMessage(message)); log.debug( "Passed handler to executor." ); } else @@ -269,7 +264,9 @@ public class UDPDiscoveryReceiver /** * Separate thread run when a command comes into the UDPDiscoveryReceiver. + * @deprectaed No longer used */ + @Deprecated public class MessageHandler implements Runnable { @@ -287,62 +284,69 @@ public class UDPDiscoveryReceiver /** * Process the message. */ - @SuppressWarnings("synthetic-access") @Override public void run() { - // consider comparing ports here instead. - if ( message.getRequesterId() == CacheInfo.listenerId ) - { - log.debug( "Ignoring message sent from self" ); - } - else - { - log.debug( "Process message sent from another" ); - log.debug( "Message = {0}", message ); - - if ( message.getHost() == null || message.getCacheNames() == null || message.getCacheNames().isEmpty() ) - { - log.debug( "Ignoring invalid message: {0}", message ); - } - else - { - processMessage(); - } - } + handleMessage(message); } - /** - * Process the incoming message. - */ - @SuppressWarnings("synthetic-access") - private void processMessage() + } + + /** + * Separate thread run when a command comes into the UDPDiscoveryReceiver. + */ + private void handleMessage(UDPDiscoveryMessage message) + { + // consider comparing ports here instead. + if ( message.getRequesterId() == CacheInfo.listenerId ) { - final DiscoveredService discoveredService = new DiscoveredService(); - discoveredService.setServiceAddress( message.getHost() ); - discoveredService.setCacheNames( message.getCacheNames() ); - discoveredService.setServicePort( message.getPort() ); - discoveredService.setLastHearFromTime( System.currentTimeMillis() ); - - // if this is a request message, have the service handle it and - // return - if ( message.getMessageType() == BroadcastType.REQUEST ) - { - log.debug( "Message is a Request Broadcast, will have the service handle it." ); - service.serviceRequestBroadcast(); - } - else if ( message.getMessageType() == BroadcastType.REMOVE ) + log.debug( "Ignoring message sent from self" ); + } + else + { + log.debug( "Process message sent from another" ); + log.debug( "Message = {0}", message ); + + if ( message.getHost() == null || message.getCacheNames() == null || message.getCacheNames().isEmpty() ) { - log.debug( "Removing service from set {0}", discoveredService ); - service.removeDiscoveredService( discoveredService ); + log.debug( "Ignoring invalid message: {0}", message ); } else { - service.addOrUpdateService( discoveredService ); + processMessage(message); } } } + /** + * Process the incoming message. + */ + private void processMessage(UDPDiscoveryMessage message) + { + final DiscoveredService discoveredService = new DiscoveredService(); + discoveredService.setServiceAddress( message.getHost() ); + discoveredService.setCacheNames( message.getCacheNames() ); + discoveredService.setServicePort( message.getPort() ); + discoveredService.setLastHearFromTime( System.currentTimeMillis() ); + + // if this is a request message, have the service handle it and + // return + if ( message.getMessageType() == BroadcastType.REQUEST ) + { + log.debug( "Message is a Request Broadcast, will have the service handle it." ); + service.serviceRequestBroadcast(); + } + else if ( message.getMessageType() == BroadcastType.REMOVE ) + { + log.debug( "Removing service from set {0}", discoveredService ); + service.removeDiscoveredService( discoveredService ); + } + else + { + service.addOrUpdateService( discoveredService ); + } + } + /** Shuts down the socket. */ @Override public void shutdown()