This is an automated email from the ASF dual-hosted git repository.

tv pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/commons-jcs.git

commit a9cf3fc0daa2f7448884e9d46e1e410c2db7add0
Author: Thomas Vandahl <t...@apache.org>
AuthorDate: Mon Feb 15 22:21:43 2021 +0100

    Make IElementSerializer configurable through all levels
---
 .../lateral/socket/tcp/LateralTCPCacheFactory.java |  4 +-
 .../jcs3/utils/discovery/UDPDiscoveryManager.java  | 29 ++++++++++++-
 .../jcs3/utils/discovery/UDPDiscoveryReceiver.java | 17 ++------
 .../jcs3/utils/discovery/UDPDiscoverySender.java   | 25 +++++++++++-
 .../jcs3/utils/discovery/UDPDiscoveryService.java  | 47 +++++++++++++++++++---
 5 files changed, 99 insertions(+), 23 deletions(-)

diff --git 
a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPCacheFactory.java
 
b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPCacheFactory.java
index 69a0eca..fc08301 100644
--- 
a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPCacheFactory.java
+++ 
b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPCacheFactory.java
@@ -390,7 +390,9 @@ public class LateralTCPCacheFactory
             // get dereferenced, also we don't want one for every region.
             discovery = UDPDiscoveryManager.getInstance().getService( 
lac.getUdpDiscoveryAddr(),
                                                                       
lac.getUdpDiscoveryPort(),
-                                                                      
lac.getTcpListenerPort(), cacheMgr);
+                                                                      
lac.getTcpListenerPort(),
+                                                                      cacheMgr,
+                                                                      
elementSerializer);
 
             discovery.addParticipatingCacheName( lac.getCacheName() );
             discovery.addDiscoveryListener( discoveryListener );
diff --git 
a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryManager.java
 
b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryManager.java
index 1d51838..f07f4de 100644
--- 
a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryManager.java
+++ 
b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryManager.java
@@ -23,9 +23,11 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
 import org.apache.commons.jcs3.engine.behavior.ICompositeCacheManager;
+import org.apache.commons.jcs3.engine.behavior.IElementSerializer;
 import org.apache.commons.jcs3.engine.behavior.IProvideScheduler;
 import org.apache.commons.jcs3.log.Log;
 import org.apache.commons.jcs3.log.LogManager;
+import org.apache.commons.jcs3.utils.serialization.StandardSerializer;
 
 /**
  * This manages UDPDiscovery Services. We should end up with one service per 
Lateral Cache Manager
@@ -60,6 +62,7 @@ public class UDPDiscoveryManager
         return INSTANCE;
     }
 
+
     /**
      * Creates a service for the address and port if one doesn't exist already.
      * <p>
@@ -71,11 +74,33 @@ public class UDPDiscoveryManager
      * @param servicePort
      * @param cacheMgr
      * @return UDPDiscoveryService
+     * @deprecated Specify serializer implementation explicitly
      */
+    @Deprecated
     public UDPDiscoveryService getService( final String discoveryAddress, 
final int discoveryPort, final int servicePort,
                                                         final 
ICompositeCacheManager cacheMgr )
     {
-        final String key = discoveryAddress + ":" + discoveryPort + ":" + 
servicePort;
+        return getService(discoveryAddress, discoveryPort, servicePort, 
cacheMgr, new StandardSerializer());
+    }
+
+    /**
+     * Creates a service for the address and port if one doesn't exist already.
+     * <p>
+     * We need to key this using the listener port too. TODO think of making 
one discovery service
+     * work for multiple types of clients.
+     * <p>
+     * @param discoveryAddress
+     * @param discoveryPort
+     * @param servicePort
+     * @param cacheMgr
+     * @param serializer
+     *
+     * @return UDPDiscoveryService
+     */
+    public UDPDiscoveryService getService( final String discoveryAddress, 
final int discoveryPort,
+            final int servicePort, final ICompositeCacheManager cacheMgr, 
final IElementSerializer serializer )
+    {
+        final String key = String.join(":", discoveryAddress, 
String.valueOf(discoveryPort), String.valueOf(servicePort));
 
         final UDPDiscoveryService service = services.computeIfAbsent(key, k -> 
{
             log.info( "Creating service for address:port:servicePort [{0}]", 
key );
@@ -85,7 +110,7 @@ public class UDPDiscoveryManager
             attributes.setUdpDiscoveryPort( discoveryPort );
             attributes.setServicePort( servicePort );
 
-            final UDPDiscoveryService newService = new UDPDiscoveryService( 
attributes );
+            final UDPDiscoveryService newService = new 
UDPDiscoveryService(attributes, serializer);
 
             // register for shutdown notification
             cacheMgr.registerShutdownObserver( newService );
diff --git 
a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryReceiver.java
 
b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryReceiver.java
index e269301..60b872c 100644
--- 
a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryReceiver.java
+++ 
b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryReceiver.java
@@ -19,9 +19,7 @@ package org.apache.commons.jcs3.utils.discovery;
  * under the License.
  */
 
-import java.io.ByteArrayInputStream;
 import java.io.IOException;
-import java.io.ObjectInputStream;
 import java.net.DatagramPacket;
 import java.net.InetAddress;
 import java.net.MulticastSocket;
@@ -32,7 +30,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.jcs3.engine.CacheInfo;
 import org.apache.commons.jcs3.engine.behavior.IShutdownObserver;
-import org.apache.commons.jcs3.io.ObjectInputStreamClassLoaderAware;
 import org.apache.commons.jcs3.log.Log;
 import org.apache.commons.jcs3.log.LogManager;
 import org.apache.commons.jcs3.utils.net.HostNameUtil;
@@ -47,9 +44,6 @@ public class UDPDiscoveryReceiver
     /** The log factory */
     private static final Log log = LogManager.getLog( 
UDPDiscoveryReceiver.class );
 
-    /** buffer */
-    private final byte[] mBuffer = new byte[65536];
-
     /** The socket used for communication. */
     private MulticastSocket mSocket;
 
@@ -159,7 +153,8 @@ public class UDPDiscoveryReceiver
     public Object waitForMessage()
         throws IOException
     {
-        final DatagramPacket packet = new DatagramPacket( mBuffer, 
mBuffer.length );
+        final byte[] mBuffer = new byte[65536];
+        final DatagramPacket packet = new DatagramPacket(mBuffer, 
mBuffer.length);
         Object obj = null;
         try
         {
@@ -170,11 +165,7 @@ public class UDPDiscoveryReceiver
             log.debug( "Received packet from address [{0}]",
                     () -> packet.getSocketAddress() );
 
-            try (ByteArrayInputStream byteStream = new 
ByteArrayInputStream(mBuffer, 0, packet.getLength());
-                 ObjectInputStream objectStream = new 
ObjectInputStreamClassLoaderAware(byteStream, null))
-            {
-                obj = objectStream.readObject();
-            }
+            obj = service.getSerializer().deSerialize(mBuffer, null);
 
             if ( obj instanceof UDPDiscoveryMessage )
             {
@@ -188,7 +179,7 @@ public class UDPDiscoveryReceiver
                         packet.getSocketAddress(), obj );
             }
         }
-        catch ( final Exception e )
+        catch ( final IOException | ClassNotFoundException e )
         {
             log.error( "Error receiving multicast packet", e );
         }
diff --git 
a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoverySender.java
 
b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoverySender.java
index b153401..b35be3c 100644
--- 
a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoverySender.java
+++ 
b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoverySender.java
@@ -27,6 +27,7 @@ import java.net.MulticastSocket;
 import java.util.ArrayList;
 
 import org.apache.commons.jcs3.engine.CacheInfo;
+import org.apache.commons.jcs3.engine.behavior.IElementSerializer;
 import org.apache.commons.jcs3.log.Log;
 import org.apache.commons.jcs3.log.LogManager;
 import 
org.apache.commons.jcs3.utils.discovery.UDPDiscoveryMessage.BroadcastType;
@@ -52,7 +53,7 @@ public class UDPDiscoverySender implements AutoCloseable
     private final int multicastPort;
 
     /** Used to serialize messages */
-    private final StandardSerializer serializer = new StandardSerializer();
+    private final IElementSerializer serializer;
 
     /**
      * Constructor for the UDPDiscoverySender object
@@ -65,10 +66,31 @@ public class UDPDiscoverySender implements AutoCloseable
      * @param port
      * @param udpTTL the Datagram packet time-to-live
      * @throws IOException
+     * @deprecated Specify serializer implementation explicitly
      */
+    @Deprecated
     public UDPDiscoverySender( final String host, final int port, final int 
udpTTL )
         throws IOException
     {
+        this(host, port, udpTTL, new StandardSerializer());
+    }
+
+    /**
+     * Constructor for the UDPDiscoverySender object
+     * <p>
+     * This sender can be used to send multiple messages.
+     * <p>
+     * When you are done sending, you should destroy the socket sender.
+     * <p>
+     * @param host
+     * @param port
+     * @param udpTTL the Datagram packet time-to-live
+     * @param serializer the Serializer to use when sending messages
+     * @throws IOException
+     */
+    public UDPDiscoverySender( final String host, final int port, final int 
udpTTL, IElementSerializer serializer)
+        throws IOException
+    {
         try
         {
             log.debug( "Constructing socket for sender on port [{0}]", port );
@@ -89,6 +111,7 @@ public class UDPDiscoverySender implements AutoCloseable
         }
 
         this.multicastPort = port;
+        this.serializer = serializer;
     }
 
     /**
diff --git 
a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryService.java
 
b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryService.java
index bbe4b4e..8d08463 100644
--- 
a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryService.java
+++ 
b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryService.java
@@ -32,12 +32,14 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.commons.jcs3.engine.behavior.IElementSerializer;
 import org.apache.commons.jcs3.engine.behavior.IRequireScheduler;
 import org.apache.commons.jcs3.engine.behavior.IShutdownObserver;
 import org.apache.commons.jcs3.log.Log;
 import org.apache.commons.jcs3.log.LogManager;
 import org.apache.commons.jcs3.utils.discovery.behavior.IDiscoveryListener;
 import org.apache.commons.jcs3.utils.net.HostNameUtil;
+import org.apache.commons.jcs3.utils.serialization.StandardSerializer;
 
 /**
  * This service creates a listener that can create lateral caches and add them 
to the no wait list.
@@ -63,8 +65,11 @@ public class UDPDiscoveryService
     /** attributes */
     private UDPDiscoveryAttributes udpDiscoveryAttributes;
 
+    /** Used to serialize messages */
+    private final IElementSerializer serializer;
+
     /** is this shut down? */
-    private AtomicBoolean shutdown = new AtomicBoolean(false);
+    private final AtomicBoolean shutdown = new AtomicBoolean(false);
 
     /** This is a set of services that have been discovered. */
     private final ConcurrentMap<Integer, DiscoveredService> discoveredServices 
= new ConcurrentHashMap<>();
@@ -82,9 +87,24 @@ public class UDPDiscoveryService
     private ScheduledFuture<?> cleanupTaskFuture = null;
 
     /**
-     * @param attributes
+     * Constructor
+     *
+     * @param attributes settings of the service
+     * @deprecated Specify serializer implementation explicitly
+     */
+    @Deprecated
+    public UDPDiscoveryService(final UDPDiscoveryAttributes attributes)
+    {
+        this(attributes, new StandardSerializer());
+    }
+
+    /**
+     * Constructor
+     *
+     * @param attributes settings of service
+     * @param serializer the serializer to use to send and receive messages
      */
-    public UDPDiscoveryService( final UDPDiscoveryAttributes attributes)
+    public UDPDiscoveryService(final UDPDiscoveryAttributes attributes, 
IElementSerializer serializer)
     {
         udpDiscoveryAttributes = attributes.clone();
 
@@ -114,6 +134,8 @@ public class UDPDiscoveryService
                     getUdpDiscoveryAttributes().getUdpDiscoveryPort(), e );
         }
 
+        this.serializer = serializer;
+
         // initiate sender broadcast
         initiateBroadcast();
     }
@@ -177,7 +199,8 @@ public class UDPDiscoveryService
         try (UDPDiscoverySender sender = new UDPDiscoverySender(
                 getUdpDiscoveryAttributes().getUdpDiscoveryAddr(),
                 getUdpDiscoveryAttributes().getUdpDiscoveryPort(),
-                getUdpDiscoveryAttributes().getUdpTTL()))
+                getUdpDiscoveryAttributes().getUdpTTL(),
+                getSerializer()))
         {
             sender.requestBroadcast();
 
@@ -202,7 +225,8 @@ public class UDPDiscoveryService
         try (UDPDiscoverySender sender = new UDPDiscoverySender(
                 getUdpDiscoveryAttributes().getUdpDiscoveryAddr(),
                 getUdpDiscoveryAttributes().getUdpDiscoveryPort(),
-                getUdpDiscoveryAttributes().getUdpTTL()))
+                getUdpDiscoveryAttributes().getUdpTTL(),
+                getSerializer()))
         {
             sender.passiveBroadcast(
                     getUdpDiscoveryAttributes().getServiceAddress(),
@@ -230,7 +254,8 @@ public class UDPDiscoveryService
         try (UDPDiscoverySender sender = new UDPDiscoverySender(
                 getUdpDiscoveryAttributes().getUdpDiscoveryAddr(),
                 getUdpDiscoveryAttributes().getUdpDiscoveryPort(),
-                getUdpDiscoveryAttributes().getUdpTTL()))
+                getUdpDiscoveryAttributes().getUdpTTL(),
+                getSerializer()))
         {
             sender.removeBroadcast(
                     getUdpDiscoveryAttributes().getServiceAddress(),
@@ -335,6 +360,16 @@ public class UDPDiscoveryService
     }
 
     /**
+     * Return the serializer implementation
+     *
+     * @return the serializer
+     */
+    public IElementSerializer getSerializer()
+    {
+        return serializer;
+    }
+
+    /**
      * Start necessary receiver thread
      */
     public void startup()

Reply via email to