This is an automated email from the ASF dual-hosted git repository. tv pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/commons-jcs.git
commit 24b7a0e964d04b5078d4ee8de44b2032b01c7dc0 Author: Thomas Vandahl <[email protected]> AuthorDate: Tue Apr 14 18:43:36 2026 +0200 Improve robustness of background threads --- .../lateral/socket/tcp/LateralTCPListener.java | 103 +++++++-------- .../jcs4/engine/AbstractCacheEventQueue.java | 4 +- .../jcs4/utils/discovery/UDPDiscoveryReceiver.java | 32 ++--- .../jcs4/utils/discovery/UDPDiscoveryService.java | 144 +++++++++------------ 4 files changed, 123 insertions(+), 160 deletions(-) diff --git a/commons-jcs4-core/src/main/java/org/apache/commons/jcs4/auxiliary/lateral/socket/tcp/LateralTCPListener.java b/commons-jcs4-core/src/main/java/org/apache/commons/jcs4/auxiliary/lateral/socket/tcp/LateralTCPListener.java index 4017a860..021c861b 100644 --- a/commons-jcs4-core/src/main/java/org/apache/commons/jcs4/auxiliary/lateral/socket/tcp/LateralTCPListener.java +++ b/commons-jcs4-core/src/main/java/org/apache/commons/jcs4/auxiliary/lateral/socket/tcp/LateralTCPListener.java @@ -56,7 +56,7 @@ public class LateralTCPListener<K, V> private static final Log log = Log.getLog( LateralTCPListener.class ); /** How long the server will block on an accept(). 0 is infinite. */ - private static final Duration acceptTimeOut = Duration.ofMillis(1000); + private static final Duration acceptTimeOut = Duration.ofSeconds(1); /** Map of available instances, keyed by port */ private static final ConcurrentHashMap<String, ILateralCacheListener<?, ?>> instances = @@ -131,7 +131,7 @@ public class LateralTCPListener<K, V> */ protected LateralTCPListener( final ILateralTCPCacheAttributes ilca, final IElementSerializer serializer ) { - this.setTcpLateralCacheAttributes( ilca ); + this.tcpLateralCacheAttributes = ilca; this.serializer = serializer; } @@ -171,7 +171,7 @@ public class LateralTCPListener<K, V> /** * @return the getCnt. */ - public int getGetCnt() + protected int getGetCnt() { return getCnt; } @@ -194,7 +194,7 @@ public class LateralTCPListener<K, V> * * @return the putCnt. */ - public int getPutCnt() + protected int getPutCnt() { return putCnt; } @@ -202,19 +202,11 @@ public class LateralTCPListener<K, V> /** * @return the removeCnt. */ - public int getRemoveCnt() + protected int getRemoveCnt() { return removeCnt; } - /** - * @return the tcpLateralCacheAttributes. - */ - public ILateralTCPCacheAttributes getTcpLateralCacheAttributes() - { - return tcpLateralCacheAttributes; - } - /** * A Separate thread that runs when a command comes into the LateralTCPReceiver. */ @@ -308,7 +300,7 @@ public class LateralTCPListener<K, V> // check to see if they are the same // if so, then don't remove, otherwise issue a remove if (led.valHashCode() != -1 && - getTcpLateralCacheAttributes().isFilterRemoveByHashCode()) + tcpLateralCacheAttributes.isFilterRemoveByHashCode()) { final ICacheElement<K, V> test = getCache( cacheName ).localGet( key ); if ( test != null ) @@ -364,7 +356,7 @@ public class LateralTCPListener<K, V> if ( log.isInfoEnabled() && getGetCnt() % 100 == 0 ) { log.info( "Get Count (port {0}) = {1}", - () -> getTcpLateralCacheAttributes().getTcpListenerPort(), + () -> tcpLateralCacheAttributes.getTcpListenerPort(), this::getGetCnt); } @@ -400,7 +392,7 @@ public class LateralTCPListener<K, V> if ( log.isInfoEnabled() && getGetCnt() % 100 == 0 ) { log.info( "GetMatching Count (port {0}) = {1}", - () -> getTcpLateralCacheAttributes().getTcpListenerPort(), + () -> tcpLateralCacheAttributes.getTcpListenerPort(), this::getGetCnt); } @@ -423,7 +415,7 @@ public class LateralTCPListener<K, V> if ( log.isInfoEnabled() && getPutCnt() % 100 == 0 ) { log.info( "Put Count (port {0}) = {1}", - () -> getTcpLateralCacheAttributes().getTcpListenerPort(), + () -> tcpLateralCacheAttributes.getTcpListenerPort(), this::getPutCnt); } @@ -477,8 +469,8 @@ public class LateralTCPListener<K, V> { try { - final int port = getTcpLateralCacheAttributes().getTcpListenerPort(); - final String host = getTcpLateralCacheAttributes().getTcpListenerHost(); + final int port = tcpLateralCacheAttributes.getTcpListenerPort(); + final String host = tcpLateralCacheAttributes.getTcpListenerHost(); terminated.set(false); shutdown.set(false); @@ -527,48 +519,55 @@ public class LateralTCPListener<K, V> // Check to see if we've been asked to exit, and exit while (!terminated.get()) { - final int activeKeys = selector.select(acceptTimeOut.toMillis()); - if (activeKeys == 0) - { - continue; - } - - for (final Iterator<SelectionKey> i = selector.selectedKeys().iterator(); i.hasNext();) + try { - if (terminated.get()) - { - break; - } - - final SelectionKey key = i.next(); - - if (!key.isValid()) + final int activeKeys = selector.select(acceptTimeOut.toMillis()); + if (activeKeys == 0) { continue; } - if (key.isAcceptable()) + for (final Iterator<SelectionKey> i = selector.selectedKeys().iterator(); i.hasNext();) { - final ServerSocketChannel server = (ServerSocketChannel) key.channel(); - final SocketChannel client = server.accept(); - if (client == null) + if (terminated.get()) + { + break; + } + + final SelectionKey key = i.next(); + + if (!key.isValid()) { - //may happen in non-blocking mode continue; } - log.info("Connected to client at {0}", client.getRemoteAddress()); + if (key.isAcceptable()) + { + final ServerSocketChannel server = (ServerSocketChannel) key.channel(); + final SocketChannel client = server.accept(); + if (client == null) + { + //may happen in non-blocking mode + continue; + } + + log.info("Connected to client at {0}", client.getRemoteAddress()); + + client.configureBlocking(false); + client.register(selector, SelectionKey.OP_READ); + } - client.configureBlocking(false); - client.register(selector, SelectionKey.OP_READ); - } + if (key.isReadable()) + { + handleClient(key); + } - if (key.isReadable()) - { - handleClient(key); + i.remove(); } - - i.remove(); + } + catch (Exception e) + { + log.error( "Exception occured handling client connection", e ); } } @@ -635,14 +634,6 @@ public class LateralTCPListener<K, V> log.debug( "set listenerId = {0}", id ); } - /** - * @param tcpLateralCacheAttributes The tcpLateralCacheAttributes to set. - */ - public void setTcpLateralCacheAttributes( final ILateralTCPCacheAttributes tcpLateralCacheAttributes ) - { - this.tcpLateralCacheAttributes = tcpLateralCacheAttributes; - } - /** * Shuts down the receiver. */ diff --git a/commons-jcs4-core/src/main/java/org/apache/commons/jcs4/engine/AbstractCacheEventQueue.java b/commons-jcs4-core/src/main/java/org/apache/commons/jcs4/engine/AbstractCacheEventQueue.java index b40a14c6..3da70776 100644 --- a/commons-jcs4-core/src/main/java/org/apache/commons/jcs4/engine/AbstractCacheEventQueue.java +++ b/commons-jcs4-core/src/main/java/org/apache/commons/jcs4/engine/AbstractCacheEventQueue.java @@ -89,9 +89,7 @@ public abstract class AbstractCacheEventQueue<K, V> } } - log.warn( "Dropping Event and marking Event Queue {0} as " - + "non-functional.", this ); - destroy(); + log.warn("Dropping Event {0}.", this); } /** diff --git a/commons-jcs4-core/src/main/java/org/apache/commons/jcs4/utils/discovery/UDPDiscoveryReceiver.java b/commons-jcs4-core/src/main/java/org/apache/commons/jcs4/utils/discovery/UDPDiscoveryReceiver.java index 038ae0eb..c8f366e8 100644 --- a/commons-jcs4-core/src/main/java/org/apache/commons/jcs4/utils/discovery/UDPDiscoveryReceiver.java +++ b/commons-jcs4-core/src/main/java/org/apache/commons/jcs4/utils/discovery/UDPDiscoveryReceiver.java @@ -190,7 +190,7 @@ public class UDPDiscoveryReceiver /** * @return the cnt. */ - public int getCnt() + protected int getCnt() { return cnt.get(); } @@ -227,11 +227,11 @@ public class UDPDiscoveryReceiver @Override public void run() { - try - { - log.debug( "Waiting for message." ); + log.debug( "Waiting for message." ); - while (!shutdown.get()) + while (!shutdown.get()) + { + try { final int activeKeys = selector.select(); if (activeKeys == 0) @@ -281,7 +281,7 @@ public class UDPDiscoveryReceiver msg.setHost(sourceAddress.getHostString()); log.debug( "Read object from address [{0}], object=[{1}]", - sourceAddress, obj ); + sourceAddress, msg ); pooledExecutor.execute(() -> handleMessage(msg)); log.debug( "Passed handler to executor." ); @@ -293,20 +293,12 @@ public class UDPDiscoveryReceiver } } } - } // end while - } - catch ( final IOException e ) - { - log.error( "Unexpected exception in UDP receiver.", e ); - } - } - - /** - * @param cnt The cnt to set. - */ - public void setCnt( final int cnt ) - { - this.cnt.set(cnt); + } + catch ( final Exception e ) + { + log.error( "Unexpected exception in UDP receiver.", e ); + } + } // end while } /** diff --git a/commons-jcs4-core/src/main/java/org/apache/commons/jcs4/utils/discovery/UDPDiscoveryService.java b/commons-jcs4-core/src/main/java/org/apache/commons/jcs4/utils/discovery/UDPDiscoveryService.java index 838d0541..f5ccb0c5 100644 --- a/commons-jcs4-core/src/main/java/org/apache/commons/jcs4/utils/discovery/UDPDiscoveryService.java +++ b/commons-jcs4-core/src/main/java/org/apache/commons/jcs4/utils/discovery/UDPDiscoveryService.java @@ -33,6 +33,8 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -44,6 +46,7 @@ import org.apache.commons.jcs4.engine.behavior.IShutdownObserver; import org.apache.commons.jcs4.log.Log; import org.apache.commons.jcs4.utils.discovery.behavior.IDiscoveryListener; import org.apache.commons.jcs4.utils.net.HostNameUtil; +import org.apache.commons.jcs4.utils.threadpool.DaemonThreadFactory; /** * This service creates a listener that can create lateral caches and add them to the no wait list. @@ -60,8 +63,8 @@ public class UDPDiscoveryService /** The logger */ private static final Log log = Log.getLog( UDPDiscoveryService.class ); - /** Thread that listens for messages */ - private Thread udpReceiverThread; + /** Manage thread that listens for messages */ + private ExecutorService udpReceiverExecutor; /** The runnable that the receiver thread runs */ private UDPDiscoveryReceiver receiver; @@ -85,6 +88,9 @@ public class UDPDiscoveryService /** Set of listeners. */ private final Set<IDiscoveryListener> discoveryListeners = new CopyOnWriteArraySet<>(); + /** Detected multicast address */ + private InetAddress multicastAddress; + /** Handle to cancel the scheduled broadcast task */ private ScheduledFuture<?> broadcastTaskFuture; @@ -102,22 +108,23 @@ public class UDPDiscoveryService { this.udpDiscoveryAttributes = attributes; this.serializer = serializer; + this.udpReceiverExecutor = Executors.newSingleThreadExecutor(new DaemonThreadFactory("JCS-UDPReceiver-")); try { - final InetAddress multicastAddress = InetAddress.getByName( - getUdpDiscoveryAttributes().udpDiscoveryAddr()); + this.multicastAddress = InetAddress.getByName( + udpDiscoveryAttributes.udpDiscoveryAddr()); // Set service address if still empty - if (getUdpDiscoveryAttributes().serviceAddress() == null || - getUdpDiscoveryAttributes().serviceAddress().isEmpty()) + if (udpDiscoveryAttributes.serviceAddress() == null || + udpDiscoveryAttributes.serviceAddress().isEmpty()) { // Use same interface as for multicast NetworkInterface serviceInterface = null; - if (getUdpDiscoveryAttributes().udpDiscoveryInterface() != null) + if (udpDiscoveryAttributes.udpDiscoveryInterface() != null) { serviceInterface = NetworkInterface.getByName( - getUdpDiscoveryAttributes().udpDiscoveryInterface()); + udpDiscoveryAttributes.udpDiscoveryInterface()); } else { @@ -143,7 +150,8 @@ public class UDPDiscoveryService // if Multicast uses IPv6, try to publish our IPv6 address break; } - } else if (serviceAddress instanceof Inet4Address && + } + else if (serviceAddress instanceof Inet4Address && !serviceAddress.isLoopbackAddress() && !serviceAddress.isMulticastAddress() && serviceAddress.isSiteLocalAddress()) @@ -159,29 +167,21 @@ public class UDPDiscoveryService serviceAddress = HostNameUtil.getLocalHostLANAddress(); } - setUdpDiscoveryAttributes( - getUdpDiscoveryAttributes().withServiceAddress( - serviceAddress.getHostAddress())); + this.udpDiscoveryAttributes = udpDiscoveryAttributes.withServiceAddress( + serviceAddress.getHostAddress()); } catch ( final UnknownHostException e ) { log.error( "Couldn't get local host address", e ); } } - - // todo need some kind of recovery here. - receiver = new UDPDiscoveryReceiver( this::processMessage, - getUdpDiscoveryAttributes().udpDiscoveryInterface(), - multicastAddress, - getUdpDiscoveryAttributes().udpDiscoveryPort() ); - receiver.setSerializer(serializer); } catch ( final IOException e ) { - log.error( "Problem creating UDPDiscoveryReceiver, address [{0}] " + log.error( "Problem creating UDPDiscoveryService, address [{0}] " + "port [{1}] we won't be able to find any other caches", - getUdpDiscoveryAttributes().udpDiscoveryAddr(), - getUdpDiscoveryAttributes().udpDiscoveryPort(), e ); + udpDiscoveryAttributes.udpDiscoveryAddr(), + udpDiscoveryAttributes.udpDiscoveryPort(), e ); } // initiate sender broadcast @@ -196,7 +196,7 @@ public class UDPDiscoveryService */ public boolean addDiscoveryListener( final IDiscoveryListener listener ) { - return getDiscoveryListeners().add( listener ); + return discoveryListeners.add( listener ); } /** @@ -234,7 +234,7 @@ public class UDPDiscoveryService // If we don't do this, then if a region using the default config is initialized after notification, // it will never get the service in it's no wait list. // Leave it to the listeners to decide what to do. - getDiscoveryListeners().forEach(listener -> listener.addDiscoveredService(discoveredService)); + discoveryListeners.forEach(listener -> listener.addDiscoveredService(discoveredService)); } /** @@ -260,11 +260,11 @@ public class UDPDiscoveryService // the listeners need to be notified. getDiscoveredServices().stream() .filter(service -> { - if (now.isAfter(service.getLastHearFromTime().plus(getUdpDiscoveryAttributes().maxIdleTime()))) + if (now.isAfter(service.getLastHearFromTime().plus(udpDiscoveryAttributes.maxIdleTime()))) { log.info( "Removing service, since we haven't heard from it in " - + "{0} seconds. service = {1}", - getUdpDiscoveryAttributes().maxIdleTime().toSeconds(), service ); + + "{0}. service = {1}", + udpDiscoveryAttributes.maxIdleTime(), service ); return true; } @@ -285,14 +285,6 @@ public class UDPDiscoveryService return new ArrayList<>(cacheNames); } - /** - * @return the discoveryListeners - */ - public Set<IDiscoveryListener> getCopyOfDiscoveryListeners() - { - return new HashSet<>(getDiscoveryListeners()); - } - /** * @return the discoveredServices. */ @@ -301,49 +293,33 @@ public class UDPDiscoveryService return new HashSet<>(discoveredServices.values()); } - /** - * @return the discoveryListeners - */ - private Set<IDiscoveryListener> getDiscoveryListeners() - { - return discoveryListeners; - } - /** * Return the serializer implementation * * @return the serializer * @since 3.1 */ - public IElementSerializer getSerializer() + protected IElementSerializer getSerializer() { return serializer; } - /** - * @return the lca. - */ - public UDPDiscoveryAttributes getUdpDiscoveryAttributes() - { - return this.udpDiscoveryAttributes; - } - /** * Initial request that the other caches let it know their addresses. * * @since 3.1 */ - public void initiateBroadcast() + private void initiateBroadcast() { log.debug( "Creating sender for discoveryAddress = [{0}] and " + "discoveryPort = [{1}] myHostName = [{2}] and port = [{3}]", - () -> getUdpDiscoveryAttributes().udpDiscoveryAddr(), - () -> getUdpDiscoveryAttributes().udpDiscoveryPort(), - () -> getUdpDiscoveryAttributes().serviceAddress(), - () -> getUdpDiscoveryAttributes().servicePort() ); + udpDiscoveryAttributes::udpDiscoveryAddr, + udpDiscoveryAttributes::udpDiscoveryPort, + udpDiscoveryAttributes::serviceAddress, + udpDiscoveryAttributes::servicePort); try (UDPDiscoverySender sender = new UDPDiscoverySender( - getUdpDiscoveryAttributes(), getSerializer())) + udpDiscoveryAttributes, serializer)) { sender.requestBroadcast(); @@ -367,7 +343,7 @@ public class UDPDiscoveryService log.info( "Removing {0}", service ); } - getDiscoveryListeners().forEach(listener -> listener.removeDiscoveredService(service)); + discoveryListeners.forEach(listener -> listener.removeDiscoveredService(service)); } /** @@ -378,7 +354,7 @@ public class UDPDiscoveryService */ public boolean removeDiscoveryListener( final IDiscoveryListener listener ) { - return getDiscoveryListeners().remove( listener ); + return discoveryListeners.remove( listener ); } /** @@ -419,11 +395,11 @@ public class UDPDiscoveryService // create this connection each time. // more robust try (UDPDiscoverySender sender = new UDPDiscoverySender( - getUdpDiscoveryAttributes(), getSerializer())) + udpDiscoveryAttributes, serializer)) { sender.passiveBroadcast( - getUdpDiscoveryAttributes().serviceAddress(), - getUdpDiscoveryAttributes().servicePort(), + udpDiscoveryAttributes.serviceAddress(), + udpDiscoveryAttributes.servicePort(), getCacheNames() ); log.debug( "Called sender to issue a passive broadcast" ); @@ -432,8 +408,8 @@ public class UDPDiscoveryService { log.error( "Problem calling the UDP Discovery Sender, address [{0}] " + "port [{1}]", - getUdpDiscoveryAttributes().udpDiscoveryAddr(), - getUdpDiscoveryAttributes().udpDiscoveryPort(), e ); + udpDiscoveryAttributes.udpDiscoveryAddr(), + udpDiscoveryAttributes.udpDiscoveryPort(), e ); } } @@ -452,15 +428,7 @@ public class UDPDiscoveryService // delay and the idle time. this.cleanupTaskFuture = scheduledExecutor.scheduleAtFixedRate( this::cleanup, 0, - getUdpDiscoveryAttributes().maxIdleTime().toSeconds(), TimeUnit.SECONDS); - } - - /** - * @param attr The UDPDiscoveryAttributes to set. - */ - public void setUdpDiscoveryAttributes( final UDPDiscoveryAttributes attr ) - { - this.udpDiscoveryAttributes = attr; + udpDiscoveryAttributes.maxIdleTime().toSeconds(), TimeUnit.SECONDS); } /** @@ -481,6 +449,7 @@ public class UDPDiscoveryService cleanupTaskFuture.cancel(false); } + udpReceiverExecutor.shutdown(); if (receiver != null) { log.info( "Shutting down UDP discovery service receiver." ); @@ -508,11 +477,11 @@ public class UDPDiscoveryService // create this connection each time. // more robust try (UDPDiscoverySender sender = new UDPDiscoverySender( - getUdpDiscoveryAttributes(), getSerializer())) + udpDiscoveryAttributes, serializer)) { sender.removeBroadcast( - getUdpDiscoveryAttributes().serviceAddress(), - getUdpDiscoveryAttributes().servicePort(), + udpDiscoveryAttributes.serviceAddress(), + udpDiscoveryAttributes.servicePort(), getCacheNames() ); log.debug( "Called sender to issue a remove broadcast in shutdown." ); @@ -528,9 +497,22 @@ public class UDPDiscoveryService */ public void startup() { - udpReceiverThread = new Thread(receiver); - udpReceiverThread.setDaemon(true); - // udpReceiverThread.setName( t.getName() + "--UDPReceiver" ); - udpReceiverThread.start(); + try + { + this.receiver = new UDPDiscoveryReceiver( this::processMessage, + udpDiscoveryAttributes.udpDiscoveryInterface(), + multicastAddress, + udpDiscoveryAttributes.udpDiscoveryPort() ); + this.receiver.setSerializer(serializer); + } + catch ( final IOException e ) + { + log.error( "Problem creating UDPDiscoveryReceiver, address [{0}] " + + "port [{1}] we won't be able to find any other caches", + multicastAddress, + udpDiscoveryAttributes.udpDiscoveryPort(), e ); + } + + udpReceiverExecutor.execute(receiver); } }
