This is an automated email from the ASF dual-hosted git repository. tv pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/commons-jcs.git
commit a9cf3fc0daa2f7448884e9d46e1e410c2db7add0 Author: Thomas Vandahl <t...@apache.org> AuthorDate: Mon Feb 15 22:21:43 2021 +0100 Make IElementSerializer configurable through all levels --- .../lateral/socket/tcp/LateralTCPCacheFactory.java | 4 +- .../jcs3/utils/discovery/UDPDiscoveryManager.java | 29 ++++++++++++- .../jcs3/utils/discovery/UDPDiscoveryReceiver.java | 17 ++------ .../jcs3/utils/discovery/UDPDiscoverySender.java | 25 +++++++++++- .../jcs3/utils/discovery/UDPDiscoveryService.java | 47 +++++++++++++++++++--- 5 files changed, 99 insertions(+), 23 deletions(-) diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPCacheFactory.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPCacheFactory.java index 69a0eca..fc08301 100644 --- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPCacheFactory.java +++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPCacheFactory.java @@ -390,7 +390,9 @@ public class LateralTCPCacheFactory // get dereferenced, also we don't want one for every region. discovery = UDPDiscoveryManager.getInstance().getService( lac.getUdpDiscoveryAddr(), lac.getUdpDiscoveryPort(), - lac.getTcpListenerPort(), cacheMgr); + lac.getTcpListenerPort(), + cacheMgr, + elementSerializer); discovery.addParticipatingCacheName( lac.getCacheName() ); discovery.addDiscoveryListener( discoveryListener ); diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryManager.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryManager.java index 1d51838..f07f4de 100644 --- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryManager.java +++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryManager.java @@ -23,9 +23,11 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import org.apache.commons.jcs3.engine.behavior.ICompositeCacheManager; +import org.apache.commons.jcs3.engine.behavior.IElementSerializer; import org.apache.commons.jcs3.engine.behavior.IProvideScheduler; import org.apache.commons.jcs3.log.Log; import org.apache.commons.jcs3.log.LogManager; +import org.apache.commons.jcs3.utils.serialization.StandardSerializer; /** * This manages UDPDiscovery Services. We should end up with one service per Lateral Cache Manager @@ -60,6 +62,7 @@ public class UDPDiscoveryManager return INSTANCE; } + /** * Creates a service for the address and port if one doesn't exist already. * <p> @@ -71,11 +74,33 @@ public class UDPDiscoveryManager * @param servicePort * @param cacheMgr * @return UDPDiscoveryService + * @deprecated Specify serializer implementation explicitly */ + @Deprecated public UDPDiscoveryService getService( final String discoveryAddress, final int discoveryPort, final int servicePort, final ICompositeCacheManager cacheMgr ) { - final String key = discoveryAddress + ":" + discoveryPort + ":" + servicePort; + return getService(discoveryAddress, discoveryPort, servicePort, cacheMgr, new StandardSerializer()); + } + + /** + * Creates a service for the address and port if one doesn't exist already. + * <p> + * We need to key this using the listener port too. TODO think of making one discovery service + * work for multiple types of clients. + * <p> + * @param discoveryAddress + * @param discoveryPort + * @param servicePort + * @param cacheMgr + * @param serializer + * + * @return UDPDiscoveryService + */ + public UDPDiscoveryService getService( final String discoveryAddress, final int discoveryPort, + final int servicePort, final ICompositeCacheManager cacheMgr, final IElementSerializer serializer ) + { + final String key = String.join(":", discoveryAddress, String.valueOf(discoveryPort), String.valueOf(servicePort)); final UDPDiscoveryService service = services.computeIfAbsent(key, k -> { log.info( "Creating service for address:port:servicePort [{0}]", key ); @@ -85,7 +110,7 @@ public class UDPDiscoveryManager attributes.setUdpDiscoveryPort( discoveryPort ); attributes.setServicePort( servicePort ); - final UDPDiscoveryService newService = new UDPDiscoveryService( attributes ); + final UDPDiscoveryService newService = new UDPDiscoveryService(attributes, serializer); // register for shutdown notification cacheMgr.registerShutdownObserver( newService ); diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryReceiver.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryReceiver.java index e269301..60b872c 100644 --- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryReceiver.java +++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryReceiver.java @@ -19,9 +19,7 @@ package org.apache.commons.jcs3.utils.discovery; * under the License. */ -import java.io.ByteArrayInputStream; import java.io.IOException; -import java.io.ObjectInputStream; import java.net.DatagramPacket; import java.net.InetAddress; import java.net.MulticastSocket; @@ -32,7 +30,6 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.jcs3.engine.CacheInfo; import org.apache.commons.jcs3.engine.behavior.IShutdownObserver; -import org.apache.commons.jcs3.io.ObjectInputStreamClassLoaderAware; import org.apache.commons.jcs3.log.Log; import org.apache.commons.jcs3.log.LogManager; import org.apache.commons.jcs3.utils.net.HostNameUtil; @@ -47,9 +44,6 @@ public class UDPDiscoveryReceiver /** The log factory */ private static final Log log = LogManager.getLog( UDPDiscoveryReceiver.class ); - /** buffer */ - private final byte[] mBuffer = new byte[65536]; - /** The socket used for communication. */ private MulticastSocket mSocket; @@ -159,7 +153,8 @@ public class UDPDiscoveryReceiver public Object waitForMessage() throws IOException { - final DatagramPacket packet = new DatagramPacket( mBuffer, mBuffer.length ); + final byte[] mBuffer = new byte[65536]; + final DatagramPacket packet = new DatagramPacket(mBuffer, mBuffer.length); Object obj = null; try { @@ -170,11 +165,7 @@ public class UDPDiscoveryReceiver log.debug( "Received packet from address [{0}]", () -> packet.getSocketAddress() ); - try (ByteArrayInputStream byteStream = new ByteArrayInputStream(mBuffer, 0, packet.getLength()); - ObjectInputStream objectStream = new ObjectInputStreamClassLoaderAware(byteStream, null)) - { - obj = objectStream.readObject(); - } + obj = service.getSerializer().deSerialize(mBuffer, null); if ( obj instanceof UDPDiscoveryMessage ) { @@ -188,7 +179,7 @@ public class UDPDiscoveryReceiver packet.getSocketAddress(), obj ); } } - catch ( final Exception e ) + catch ( final IOException | ClassNotFoundException e ) { log.error( "Error receiving multicast packet", e ); } diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoverySender.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoverySender.java index b153401..b35be3c 100644 --- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoverySender.java +++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoverySender.java @@ -27,6 +27,7 @@ import java.net.MulticastSocket; import java.util.ArrayList; import org.apache.commons.jcs3.engine.CacheInfo; +import org.apache.commons.jcs3.engine.behavior.IElementSerializer; import org.apache.commons.jcs3.log.Log; import org.apache.commons.jcs3.log.LogManager; import org.apache.commons.jcs3.utils.discovery.UDPDiscoveryMessage.BroadcastType; @@ -52,7 +53,7 @@ public class UDPDiscoverySender implements AutoCloseable private final int multicastPort; /** Used to serialize messages */ - private final StandardSerializer serializer = new StandardSerializer(); + private final IElementSerializer serializer; /** * Constructor for the UDPDiscoverySender object @@ -65,10 +66,31 @@ public class UDPDiscoverySender implements AutoCloseable * @param port * @param udpTTL the Datagram packet time-to-live * @throws IOException + * @deprecated Specify serializer implementation explicitly */ + @Deprecated public UDPDiscoverySender( final String host, final int port, final int udpTTL ) throws IOException { + this(host, port, udpTTL, new StandardSerializer()); + } + + /** + * Constructor for the UDPDiscoverySender object + * <p> + * This sender can be used to send multiple messages. + * <p> + * When you are done sending, you should destroy the socket sender. + * <p> + * @param host + * @param port + * @param udpTTL the Datagram packet time-to-live + * @param serializer the Serializer to use when sending messages + * @throws IOException + */ + public UDPDiscoverySender( final String host, final int port, final int udpTTL, IElementSerializer serializer) + throws IOException + { try { log.debug( "Constructing socket for sender on port [{0}]", port ); @@ -89,6 +111,7 @@ public class UDPDiscoverySender implements AutoCloseable } this.multicastPort = port; + this.serializer = serializer; } /** diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryService.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryService.java index bbe4b4e..8d08463 100644 --- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryService.java +++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryService.java @@ -32,12 +32,14 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.commons.jcs3.engine.behavior.IElementSerializer; import org.apache.commons.jcs3.engine.behavior.IRequireScheduler; import org.apache.commons.jcs3.engine.behavior.IShutdownObserver; import org.apache.commons.jcs3.log.Log; import org.apache.commons.jcs3.log.LogManager; import org.apache.commons.jcs3.utils.discovery.behavior.IDiscoveryListener; import org.apache.commons.jcs3.utils.net.HostNameUtil; +import org.apache.commons.jcs3.utils.serialization.StandardSerializer; /** * This service creates a listener that can create lateral caches and add them to the no wait list. @@ -63,8 +65,11 @@ public class UDPDiscoveryService /** attributes */ private UDPDiscoveryAttributes udpDiscoveryAttributes; + /** Used to serialize messages */ + private final IElementSerializer serializer; + /** is this shut down? */ - private AtomicBoolean shutdown = new AtomicBoolean(false); + private final AtomicBoolean shutdown = new AtomicBoolean(false); /** This is a set of services that have been discovered. */ private final ConcurrentMap<Integer, DiscoveredService> discoveredServices = new ConcurrentHashMap<>(); @@ -82,9 +87,24 @@ public class UDPDiscoveryService private ScheduledFuture<?> cleanupTaskFuture = null; /** - * @param attributes + * Constructor + * + * @param attributes settings of the service + * @deprecated Specify serializer implementation explicitly + */ + @Deprecated + public UDPDiscoveryService(final UDPDiscoveryAttributes attributes) + { + this(attributes, new StandardSerializer()); + } + + /** + * Constructor + * + * @param attributes settings of service + * @param serializer the serializer to use to send and receive messages */ - public UDPDiscoveryService( final UDPDiscoveryAttributes attributes) + public UDPDiscoveryService(final UDPDiscoveryAttributes attributes, IElementSerializer serializer) { udpDiscoveryAttributes = attributes.clone(); @@ -114,6 +134,8 @@ public class UDPDiscoveryService getUdpDiscoveryAttributes().getUdpDiscoveryPort(), e ); } + this.serializer = serializer; + // initiate sender broadcast initiateBroadcast(); } @@ -177,7 +199,8 @@ public class UDPDiscoveryService try (UDPDiscoverySender sender = new UDPDiscoverySender( getUdpDiscoveryAttributes().getUdpDiscoveryAddr(), getUdpDiscoveryAttributes().getUdpDiscoveryPort(), - getUdpDiscoveryAttributes().getUdpTTL())) + getUdpDiscoveryAttributes().getUdpTTL(), + getSerializer())) { sender.requestBroadcast(); @@ -202,7 +225,8 @@ public class UDPDiscoveryService try (UDPDiscoverySender sender = new UDPDiscoverySender( getUdpDiscoveryAttributes().getUdpDiscoveryAddr(), getUdpDiscoveryAttributes().getUdpDiscoveryPort(), - getUdpDiscoveryAttributes().getUdpTTL())) + getUdpDiscoveryAttributes().getUdpTTL(), + getSerializer())) { sender.passiveBroadcast( getUdpDiscoveryAttributes().getServiceAddress(), @@ -230,7 +254,8 @@ public class UDPDiscoveryService try (UDPDiscoverySender sender = new UDPDiscoverySender( getUdpDiscoveryAttributes().getUdpDiscoveryAddr(), getUdpDiscoveryAttributes().getUdpDiscoveryPort(), - getUdpDiscoveryAttributes().getUdpTTL())) + getUdpDiscoveryAttributes().getUdpTTL(), + getSerializer())) { sender.removeBroadcast( getUdpDiscoveryAttributes().getServiceAddress(), @@ -335,6 +360,16 @@ public class UDPDiscoveryService } /** + * Return the serializer implementation + * + * @return the serializer + */ + public IElementSerializer getSerializer() + { + return serializer; + } + + /** * Start necessary receiver thread */ public void startup()