This is an automated email from the ASF dual-hosted git repository.

tv pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/commons-jcs.git

commit 727172b3c84da21ade3e7118080365db87b1c3ea
Author: Thomas Vandahl <[email protected]>
AuthorDate: Sat Mar 27 17:46:16 2021 +0100

    Use NIO and Selector, make IPv6 multicast work
---
 .../jcs3/utils/discovery/UDPDiscoveryReceiver.java | 213 +++++++++++++--------
 .../jcs3/utils/discovery/UDPDiscoverySender.java   |  46 ++++-
 .../jcs3/utils/discovery/UDPDiscoveryService.java  |  20 +-
 .../discovery/UDPDiscoverySenderUnitTest.java      |   7 +-
 .../discovery/UDPDiscoveryServiceUnitTest.java     |  78 ++------
 .../jcs3/utils/discovery/UDPDiscoveryUnitTest.java |  84 ++++----
 6 files changed, 259 insertions(+), 189 deletions(-)

diff --git 
a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryReceiver.java
 
b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryReceiver.java
index 66a5754..b1b31c5 100644
--- 
a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryReceiver.java
+++ 
b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryReceiver.java
@@ -20,15 +20,25 @@ package org.apache.commons.jcs3.utils.discovery;
  */
 
 import java.io.IOException;
-import java.net.DatagramPacket;
+import java.net.Inet6Address;
 import java.net.InetAddress;
-import java.net.MulticastSocket;
+import java.net.InetSocketAddress;
 import java.net.NetworkInterface;
+import java.net.StandardProtocolFamily;
+import java.net.StandardSocketOptions;
+import java.nio.ByteBuffer;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.MembershipKey;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.util.Iterator;
+import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.jcs3.engine.CacheInfo;
+import org.apache.commons.jcs3.engine.behavior.IElementSerializer;
 import org.apache.commons.jcs3.engine.behavior.IShutdownObserver;
 import org.apache.commons.jcs3.log.Log;
 import org.apache.commons.jcs3.log.LogManager;
@@ -44,8 +54,14 @@ public class UDPDiscoveryReceiver
     /** The log factory */
     private static final Log log = LogManager.getLog( 
UDPDiscoveryReceiver.class );
 
-    /** The socket used for communication. */
-    private MulticastSocket mSocket;
+    /** The channel used for communication. */
+    private DatagramChannel multicastChannel;
+
+    /** The group membership key. */
+    private MembershipKey multicastGroupKey;
+
+    /** The selector. */
+    private Selector selector;
 
     /**
      * TODO: Consider using the threadpool manager to get this thread pool. 
For now place a tight
@@ -62,8 +78,8 @@ public class UDPDiscoveryReceiver
     /** Service to get cache names and handle request broadcasts */
     private final UDPDiscoveryService service;
 
-    /** Multicast address */
-    private final InetAddress multicastAddress;
+    /** Serializer */
+    private IElementSerializer serializer;
 
     /** Is it shutdown. */
     private AtomicBoolean shutdown = new AtomicBoolean(false);
@@ -84,7 +100,11 @@ public class UDPDiscoveryReceiver
         throws IOException
     {
         this.service = service;
-        this.multicastAddress = InetAddress.getByName( multicastAddressString 
);
+        if (service != null)
+        {
+            this.serializer = service.getSerializer();
+        }
+        InetAddress multicastAddress = InetAddress.getByName( 
multicastAddressString );
 
         // create a small thread pool to handle a barrage
         this.pooledExecutor = ThreadPoolManager.getInstance().createPool(
@@ -93,7 +113,6 @@ public class UDPDiscoveryReceiver
                        "JCS-UDPDiscoveryReceiver-", Thread.MIN_PRIORITY);
 
         log.info( "Constructing listener, [{0}:{1}]", multicastAddress, 
multicastPort );
-
         createSocket( multicastInterfaceString, multicastAddress, 
multicastPort );
     }
 
@@ -111,12 +130,6 @@ public class UDPDiscoveryReceiver
     {
         try
         {
-            mSocket = new MulticastSocket( multicastPort );
-            if (log.isInfoEnabled())
-            {
-                log.info( "Joining Group: [{0}]", multicastAddress );
-            }
-
             // Use dedicated interface if specified
             NetworkInterface multicastInterface = null;
             if (multicastInterfaceString != null)
@@ -127,13 +140,20 @@ public class UDPDiscoveryReceiver
             {
                 multicastInterface = 
HostNameUtil.getMulticastNetworkInterface();
             }
-            if (multicastInterface != null)
-            {
-                log.info("Using network interface {0}", 
multicastInterface.getDisplayName());
-                mSocket.setNetworkInterface(multicastInterface);
-            }
 
-            mSocket.joinGroup( multicastAddress );
+            multicastChannel = DatagramChannel.open(
+                    multicastAddress instanceof Inet6Address ?
+                            StandardProtocolFamily.INET6 : 
StandardProtocolFamily.INET)
+                    .setOption(StandardSocketOptions.SO_REUSEADDR, true)
+                    .setOption(StandardSocketOptions.IP_MULTICAST_IF, 
multicastInterface)
+                    .bind(new InetSocketAddress(multicastPort));
+            multicastChannel.configureBlocking(false);
+
+            log.info("Joining Group: [{0}] on {1}", multicastAddress, 
multicastInterface);
+            multicastGroupKey = multicastChannel.join(multicastAddress, 
multicastInterface);
+
+            selector = Selector.open();
+            multicastChannel.register(selector, SelectionKey.OP_READ);
         }
         catch ( final IOException e )
         {
@@ -143,97 +163,120 @@ public class UDPDiscoveryReceiver
         }
     }
 
+    private final ArrayBlockingQueue<UDPDiscoveryMessage> msgQueue =
+            new ArrayBlockingQueue<>(maxPoolSize);
+
     /**
-     * Highly unreliable. If it is processing one message while another comes 
in, the second
-     * message is lost. This is for low concurrency peppering.
-     * <p>
+     * Wait for multicast message
+     *
      * @return the object message
      * @throws IOException
+     * @deprecated no longer used
      */
+    @Deprecated
     public Object waitForMessage()
         throws IOException
     {
-        final byte[] mBuffer = new byte[65536];
-        final DatagramPacket packet = new DatagramPacket(mBuffer, 
mBuffer.length);
-        Object obj = null;
         try
         {
-            log.debug( "Waiting for message." );
-
-            mSocket.receive( packet );
-
-            log.debug( "Received packet from address [{0}]",
-                    () -> packet.getSocketAddress() );
-
-            obj = service.getSerializer().deSerialize(mBuffer, null);
-
-            if ( obj instanceof UDPDiscoveryMessage )
-            {
-               // Ensure that the address we're supposed to send to is, 
indeed, the address
-               // of the machine on the other end of this connection.  This 
guards against
-               // instances where we don't exactly get the right local host 
address
-               final UDPDiscoveryMessage msg = (UDPDiscoveryMessage) obj;
-               msg.setHost(packet.getAddress().getHostAddress());
-
-                log.debug( "Read object from address [{0}], object=[{1}]",
-                        packet.getSocketAddress(), obj );
-            }
+            return msgQueue.take();
         }
-        catch ( final IOException | ClassNotFoundException e )
+        catch (InterruptedException e)
         {
-            log.error( "Error receiving multicast packet", e );
+            throw new IOException("Interrupted waiting for message", e);
         }
-
-        return obj;
     }
 
-    /** Main processing method for the LateralUDPReceiver object */
+    /** Main processing method for the UDPDiscoveryReceiver object */
     @Override
     public void run()
     {
         try
         {
+            ByteBuffer byteBuffer = ByteBuffer.allocate(65536);
+            log.debug( "Waiting for message." );
+
             while (!shutdown.get())
             {
-                final Object obj = waitForMessage();
+                int activeKeys = selector.select();
+                if (activeKeys == 0)
+                {
+                    continue;
+                }
 
-                cnt.incrementAndGet();
+                for (Iterator<SelectionKey> i = 
selector.selectedKeys().iterator(); i.hasNext();)
+                {
+                    if (shutdown.get())
+                    {
+                        break;
+                    }
 
-                log.debug( "{0} messages received.", this::getCnt );
+                    SelectionKey key = i.next();
 
-                try
-                {
-                    UDPDiscoveryMessage message = (UDPDiscoveryMessage) obj;
-                    // check for null
-                    if ( message != null )
+                    if (!key.isValid())
                     {
-                        pooledExecutor.execute(() -> handleMessage(message));
-                        log.debug( "Passed handler to executor." );
+                        continue;
                     }
-                    else
+
+                    if (key.isReadable())
                     {
-                        log.warn( "message is null" );
+                        cnt.incrementAndGet();
+                        log.debug( "{0} messages received.", this::getCnt );
+
+                        DatagramChannel mc = (DatagramChannel) key.channel();
+
+                        byteBuffer.clear();
+                        InetSocketAddress sourceAddress =
+                                (InetSocketAddress) mc.receive(byteBuffer);
+                        byteBuffer.flip();
+
+                        try
+                        {
+                            log.debug("Received packet from address [{0}]", 
sourceAddress);
+
+                            Object obj = 
serializer.deSerialize(byteBuffer.array(), null);
+
+                            if (obj instanceof UDPDiscoveryMessage)
+                            {
+                                // Ensure that the address we're supposed to 
send to is, indeed, the address
+                                // of the machine on the other end of this 
connection.  This guards against
+                                // instances where we don't exactly get the 
right local host address
+                                final UDPDiscoveryMessage msg = 
(UDPDiscoveryMessage) obj;
+                                msg.setHost(sourceAddress.getHostString());
+
+                                log.debug( "Read object from address [{0}], 
object=[{1}]",
+                                        sourceAddress, obj );
+
+                                // Just to keep the functionality of the 
deprecated waitForMessage method
+                                synchronized (msgQueue)
+                                {
+                                    // Check if queue full already?
+                                    if (msgQueue.size() == maxPoolSize)
+                                    {
+                                        // remove oldest element from queue
+                                        msgQueue.remove();
+                                    }
+
+                                    msgQueue.add(msg);
+                                }
+
+                                pooledExecutor.execute(() -> 
handleMessage(msg));
+                                log.debug( "Passed handler to executor." );
+                            }
+                        }
+                        catch ( final IOException | ClassNotFoundException e )
+                        {
+                            log.error( "Error receiving multicast packet", e );
+                        }
+
+                        i.remove();
                     }
                 }
-                catch ( final ClassCastException cce )
-                {
-                    log.warn( "Received unknown message type", 
cce.getMessage() );
-                }
             } // end while
         }
         catch ( final IOException e )
         {
             log.error( "Unexpected exception in UDP receiver.", e );
-            try
-            {
-                Thread.sleep( 100 );
-                // TODO consider some failure count so we don't do this
-                // forever.
-            }
-            catch ( final InterruptedException e2 )
-            {
-                log.error( "Problem sleeping", e2 );
-            }
         }
     }
 
@@ -254,6 +297,16 @@ public class UDPDiscoveryReceiver
     }
 
     /**
+     * For testing
+     *
+     * @param serializer the serializer to set
+     */
+    protected void setSerializer(IElementSerializer serializer)
+    {
+        this.serializer = serializer;
+    }
+
+    /**
      * Separate thread run when a command comes into the UDPDiscoveryReceiver.
      * @deprectaed No longer used
      */
@@ -280,7 +333,6 @@ public class UDPDiscoveryReceiver
         {
             handleMessage(message);
         }
-
     }
 
     /**
@@ -343,8 +395,9 @@ public class UDPDiscoveryReceiver
         {
             try
             {
-                mSocket.leaveGroup( multicastAddress );
-                mSocket.close();
+                selector.close();
+                multicastGroupKey.drop();
+                multicastChannel.close();
             }
             catch ( final IOException e )
             {
diff --git 
a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoverySender.java
 
b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoverySender.java
index b35be3c..97ab1bd 100644
--- 
a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoverySender.java
+++ 
b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoverySender.java
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.net.DatagramPacket;
 import java.net.InetAddress;
 import java.net.MulticastSocket;
+import java.net.NetworkInterface;
 import java.util.ArrayList;
 
 import org.apache.commons.jcs3.engine.CacheInfo;
@@ -31,6 +32,7 @@ import 
org.apache.commons.jcs3.engine.behavior.IElementSerializer;
 import org.apache.commons.jcs3.log.Log;
 import org.apache.commons.jcs3.log.LogManager;
 import 
org.apache.commons.jcs3.utils.discovery.UDPDiscoveryMessage.BroadcastType;
+import org.apache.commons.jcs3.utils.net.HostNameUtil;
 import org.apache.commons.jcs3.utils.serialization.StandardSerializer;
 
 /**
@@ -72,7 +74,7 @@ public class UDPDiscoverySender implements AutoCloseable
     public UDPDiscoverySender( final String host, final int port, final int 
udpTTL )
         throws IOException
     {
-        this(host, port, udpTTL, new StandardSerializer());
+        this(null, host, port, udpTTL, new StandardSerializer());
     }
 
     /**
@@ -82,19 +84,43 @@ public class UDPDiscoverySender implements AutoCloseable
      * <p>
      * When you are done sending, you should destroy the socket sender.
      * <p>
+     * @param udpDiscoveryAttributes configuration object
+     * @param serializer the Serializer to use when sending messages
+     * @throws IOException
+     */
+    public UDPDiscoverySender(final UDPDiscoveryAttributes 
udpDiscoveryAttributes, final IElementSerializer serializer)
+        throws IOException
+    {
+        this(udpDiscoveryAttributes.getUdpDiscoveryInterface(),
+            udpDiscoveryAttributes.getUdpDiscoveryAddr(),
+            udpDiscoveryAttributes.getUdpDiscoveryPort(),
+            udpDiscoveryAttributes.getUdpTTL(),
+            serializer);
+    }
+
+    /**
+     * Constructor for the UDPDiscoverySender object
+     * <p>
+     * This sender can be used to send multiple messages.
+     * <p>
+     * When you are done sending, you should destroy the socket sender.
+     * <p>
+     * @param mcastInterface the Multicast interface name to use, if null, try 
to autodetect
      * @param host
      * @param port
      * @param udpTTL the Datagram packet time-to-live
      * @param serializer the Serializer to use when sending messages
      * @throws IOException
      */
-    public UDPDiscoverySender( final String host, final int port, final int 
udpTTL, IElementSerializer serializer)
+    public UDPDiscoverySender(final String mcastInterface, final String host,
+            final int port, final int udpTTL, IElementSerializer serializer)
         throws IOException
     {
         try
         {
             log.debug( "Constructing socket for sender on port [{0}]", port );
             localSocket = new MulticastSocket( port );
+
             if (udpTTL > 0)
             {
                 log.debug( "Setting datagram TTL to [{0}]", udpTTL );
@@ -103,6 +129,22 @@ public class UDPDiscoverySender implements AutoCloseable
 
             // Remote address.
             multicastAddress = InetAddress.getByName( host );
+
+            // Use dedicated interface if specified
+            NetworkInterface multicastInterface = null;
+            if (mcastInterface != null)
+            {
+                multicastInterface = 
NetworkInterface.getByName(mcastInterface);
+            }
+            else
+            {
+                multicastInterface = 
HostNameUtil.getMulticastNetworkInterface();
+            }
+            if (multicastInterface != null)
+            {
+                log.info("Using network interface {0}", 
multicastInterface.getDisplayName());
+                localSocket.setNetworkInterface(multicastInterface);
+            }
         }
         catch ( final IOException e )
         {
diff --git 
a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryService.java
 
b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryService.java
index 8d08463..bb7b9e5 100644
--- 
a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryService.java
+++ 
b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryService.java
@@ -106,7 +106,8 @@ public class UDPDiscoveryService
      */
     public UDPDiscoveryService(final UDPDiscoveryAttributes attributes, 
IElementSerializer serializer)
     {
-        udpDiscoveryAttributes = attributes.clone();
+        this.udpDiscoveryAttributes = attributes.clone();
+        this.serializer = serializer;
 
         try
         {
@@ -134,8 +135,6 @@ public class UDPDiscoveryService
                     getUdpDiscoveryAttributes().getUdpDiscoveryPort(), e );
         }
 
-        this.serializer = serializer;
-
         // initiate sender broadcast
         initiateBroadcast();
     }
@@ -197,10 +196,7 @@ public class UDPDiscoveryService
                 () -> getUdpDiscoveryAttributes().getServicePort() );
 
         try (UDPDiscoverySender sender = new UDPDiscoverySender(
-                getUdpDiscoveryAttributes().getUdpDiscoveryAddr(),
-                getUdpDiscoveryAttributes().getUdpDiscoveryPort(),
-                getUdpDiscoveryAttributes().getUdpTTL(),
-                getSerializer()))
+                getUdpDiscoveryAttributes(), getSerializer()))
         {
             sender.requestBroadcast();
 
@@ -223,10 +219,7 @@ public class UDPDiscoveryService
         // create this connection each time.
         // more robust
         try (UDPDiscoverySender sender = new UDPDiscoverySender(
-                getUdpDiscoveryAttributes().getUdpDiscoveryAddr(),
-                getUdpDiscoveryAttributes().getUdpDiscoveryPort(),
-                getUdpDiscoveryAttributes().getUdpTTL(),
-                getSerializer()))
+                getUdpDiscoveryAttributes(), getSerializer()))
         {
             sender.passiveBroadcast(
                     getUdpDiscoveryAttributes().getServiceAddress(),
@@ -252,10 +245,7 @@ public class UDPDiscoveryService
         // create this connection each time.
         // more robust
         try (UDPDiscoverySender sender = new UDPDiscoverySender(
-                getUdpDiscoveryAttributes().getUdpDiscoveryAddr(),
-                getUdpDiscoveryAttributes().getUdpDiscoveryPort(),
-                getUdpDiscoveryAttributes().getUdpTTL(),
-                getSerializer()))
+                getUdpDiscoveryAttributes(), getSerializer()))
         {
             sender.removeBroadcast(
                     getUdpDiscoveryAttributes().getServiceAddress(),
diff --git 
a/commons-jcs-core/src/test/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoverySenderUnitTest.java
 
b/commons-jcs-core/src/test/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoverySenderUnitTest.java
index 27d60d1..4e403f6 100644
--- 
a/commons-jcs-core/src/test/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoverySenderUnitTest.java
+++ 
b/commons-jcs-core/src/test/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoverySenderUnitTest.java
@@ -61,8 +61,13 @@ public class UDPDiscoverySenderUnitTest
         throws Exception
     {
         super.setUp();
+
         receiver = new UDPDiscoveryReceiver( null, null, ADDRESS, PORT );
-        sender = new UDPDiscoverySender( ADDRESS, PORT, 0, new 
StandardSerializer() );
+        receiver.setSerializer(new StandardSerializer());
+        final Thread t = new Thread( receiver );
+        t.start();
+
+        sender = new UDPDiscoverySender(null, ADDRESS, PORT, 1, new 
StandardSerializer());
     }
 
     /**
diff --git 
a/commons-jcs-core/src/test/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryServiceUnitTest.java
 
b/commons-jcs-core/src/test/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryServiceUnitTest.java
index c0c40c7..1c17ddb 100644
--- 
a/commons-jcs-core/src/test/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryServiceUnitTest.java
+++ 
b/commons-jcs-core/src/test/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryServiceUnitTest.java
@@ -29,25 +29,35 @@ import junit.framework.TestCase;
 public class UDPDiscoveryServiceUnitTest
     extends TestCase
 {
-    /** Verify that the list is updated. */
-    public void testAddOrUpdateService_NotInList()
+    private final static String host = "228.5.6.7";
+    private final static int port = 6789;
+
+    private UDPDiscoveryService service;
+    private MockDiscoveryListener discoveryListener;
+
+    @Override
+    protected void setUp() throws Exception
     {
+        super.setUp();
+
         // SETUP
-        final String host = "228.5.6.7";
-        final int port = 6789;
         final UDPDiscoveryAttributes attributes = new UDPDiscoveryAttributes();
         attributes.setUdpDiscoveryAddr( host );
         attributes.setUdpDiscoveryPort( port );
         attributes.setServicePort( 1000 );
 
         // create the service
-        final UDPDiscoveryService service = new 
UDPDiscoveryService(attributes, new StandardSerializer());
+        service = new UDPDiscoveryService(attributes, new 
StandardSerializer());
         service.startup();
         service.addParticipatingCacheName( "testCache1" );
 
-        final MockDiscoveryListener discoveryListener = new 
MockDiscoveryListener();
+        discoveryListener = new MockDiscoveryListener();
         service.addDiscoveryListener( discoveryListener );
+    }
 
+    /** Verify that the list is updated. */
+    public void testAddOrUpdateService_NotInList()
+    {
         final DiscoveredService discoveredService = new DiscoveredService();
         discoveredService.setServiceAddress( host );
         discoveredService.setCacheNames( new ArrayList<>() );
@@ -67,35 +77,19 @@ public class UDPDiscoveryServiceUnitTest
     /** Verify that the list is updated. */
     public void testAddOrUpdateService_InList_NamesDoNotChange()
     {
-        // SETUP
-        final String host = "228.5.6.7";
-        final int port = 6789;
-        final UDPDiscoveryAttributes attributes = new UDPDiscoveryAttributes();
-        attributes.setUdpDiscoveryAddr( host );
-        attributes.setUdpDiscoveryPort( port );
-        attributes.setServicePort( 1000 );
-
-        // create the service
-        final UDPDiscoveryService service = new 
UDPDiscoveryService(attributes, new StandardSerializer());
-        service.startup();
-        service.addParticipatingCacheName( "testCache1" );
-
-        final MockDiscoveryListener discoveryListener = new 
MockDiscoveryListener();
-        service.addDiscoveryListener( discoveryListener );
-
-        final ArrayList<String> sametCacheNames = new ArrayList<>();
-        sametCacheNames.add( "name1" );
+        final ArrayList<String> sameCacheNames = new ArrayList<>();
+        sameCacheNames.add( "name1" );
 
         final DiscoveredService discoveredService = new DiscoveredService();
         discoveredService.setServiceAddress( host );
-        discoveredService.setCacheNames( sametCacheNames );
+        discoveredService.setCacheNames( sameCacheNames );
         discoveredService.setServicePort( 1000 );
         discoveredService.setLastHearFromTime( 100 );
 
 
         final DiscoveredService discoveredService2 = new DiscoveredService();
         discoveredService2.setServiceAddress( host );
-        discoveredService2.setCacheNames( sametCacheNames );
+        discoveredService2.setCacheNames( sameCacheNames );
         discoveredService2.setServicePort( 1000 );
         discoveredService2.setLastHearFromTime( 500 );
 
@@ -130,22 +124,6 @@ public class UDPDiscoveryServiceUnitTest
     /** Verify that the list is updated. */
     public void testAddOrUpdateService_InList_NamesChange()
     {
-        // SETUP
-        final String host = "228.5.6.7";
-        final int port = 6789;
-        final UDPDiscoveryAttributes attributes = new UDPDiscoveryAttributes();
-        attributes.setUdpDiscoveryAddr( host );
-        attributes.setUdpDiscoveryPort( port );
-        attributes.setServicePort( 1000 );
-
-        // create the service
-        final UDPDiscoveryService service = new 
UDPDiscoveryService(attributes, new StandardSerializer());
-        service.startup();
-        service.addParticipatingCacheName( "testCache1" );
-
-        final MockDiscoveryListener discoveryListener = new 
MockDiscoveryListener();
-        service.addDiscoveryListener( discoveryListener );
-
         final DiscoveredService discoveredService = new DiscoveredService();
         discoveredService.setServiceAddress( host );
         discoveredService.setCacheNames( new ArrayList<>() );
@@ -192,22 +170,6 @@ public class UDPDiscoveryServiceUnitTest
     /** Verify that the list is updated. */
     public void testRemoveDiscoveredService()
     {
-        // SETUP
-        final String host = "228.5.6.7";
-        final int port = 6789;
-        final UDPDiscoveryAttributes attributes = new UDPDiscoveryAttributes();
-        attributes.setUdpDiscoveryAddr( host );
-        attributes.setUdpDiscoveryPort( port );
-        attributes.setServicePort( 1000 );
-
-        // create the service
-        final UDPDiscoveryService service = new 
UDPDiscoveryService(attributes, new StandardSerializer());
-        service.startup();
-        service.addParticipatingCacheName( "testCache1" );
-
-        final MockDiscoveryListener discoveryListener = new 
MockDiscoveryListener();
-        service.addDiscoveryListener( discoveryListener );
-
         final DiscoveredService discoveredService = new DiscoveredService();
         discoveredService.setServiceAddress( host );
         discoveredService.setCacheNames( new ArrayList<>() );
diff --git 
a/commons-jcs-core/src/test/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryUnitTest.java
 
b/commons-jcs-core/src/test/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryUnitTest.java
index 92336c3..719aa7d 100644
--- 
a/commons-jcs-core/src/test/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryUnitTest.java
+++ 
b/commons-jcs-core/src/test/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryUnitTest.java
@@ -36,13 +36,34 @@ public class UDPDiscoveryUnitTest
      * <p>
      * @throws Exception
      */
-    public void testSimpleUDPDiscovery()
+    public void testSimpleUDPDiscoveryIPv4()
+        throws Exception
+    {
+        simpleUDPDiscovery("228.5.6.7");
+    }
+
+    /**
+     * <p>
+     * @throws Exception
+     */
+    public void testSimpleUDPDiscoveryIPv6()
+        throws Exception
+    {
+        simpleUDPDiscovery("FF02::5678");
+    }
+
+    /**
+     * <p>
+     * @throws Exception
+     */
+    private void simpleUDPDiscovery(String discoveryAddress)
         throws Exception
     {
         final UDPDiscoveryAttributes attributes = new UDPDiscoveryAttributes();
-        attributes.setUdpDiscoveryAddr( /*"FF7E:230::1234"*/ "228.5.6.7" );
-        attributes.setUdpDiscoveryPort( 6789 );
-        attributes.setServicePort( 1000 );
+        attributes.setUdpDiscoveryAddr(discoveryAddress);
+        attributes.setUdpDiscoveryPort(6789);
+        attributes.setServicePort(1000);
+        attributes.setUdpTTL(4); /* datagram TTL */
 
         // create the service
         final UDPDiscoveryService service = new 
UDPDiscoveryService(attributes, new StandardSerializer());
@@ -61,39 +82,36 @@ public class UDPDiscoveryUnitTest
         t.start();
 
         // create a sender
-        final UDPDiscoverySender sender = new UDPDiscoverySender(
-                attributes.getUdpDiscoveryAddr(),
-                attributes.getUdpDiscoveryPort(),
-                4, /* datagram TTL */
-                service.getSerializer());
-
-        // create more names than we have no wait facades for
-        // the only one that gets added should be testCache1
-        final ArrayList<String> cacheNames = new ArrayList<>();
-        final int numJunk = 10;
-        for ( int i = 0; i < numJunk; i++ )
+        try (final UDPDiscoverySender sender = new UDPDiscoverySender(
+                attributes, service.getSerializer()))
         {
-            cacheNames.add( "junkCacheName" + i );
-        }
-        cacheNames.add( "testCache1" );
+            // create more names than we have no wait facades for
+            // the only one that gets added should be testCache1
+            final ArrayList<String> cacheNames = new ArrayList<>();
+            final int numJunk = 10;
+            for ( int i = 0; i < numJunk; i++ )
+            {
+                cacheNames.add( "junkCacheName" + i );
+            }
+            cacheNames.add( "testCache1" );
 
-        // send max messages
-        final int max = 10;
-        int cnt = 0;
-        for ( ; cnt < max; cnt++ )
-        {
-            sender.passiveBroadcast( "localhost", 1111, cacheNames, 1 );
-            SleepUtil.sleepAtLeast( 20 );
-        }
+            // send max messages
+            final int max = 10;
+            int cnt = 0;
+            for ( ; cnt < max; cnt++ )
+            {
+                sender.passiveBroadcast( "localhost", 1111, cacheNames, 1 );
+                SleepUtil.sleepAtLeast( 20 );
+            }
 
-        SleepUtil.sleepAtLeast( 200 );
+            SleepUtil.sleepAtLeast( 200 );
 
-        // check to see that we got 10 messages
-        //System.out.println( "Receiver count = " + receiver.getCnt() );
+            // check to see that we got 10 messages
+            //System.out.println( "Receiver count = " + receiver.getCnt() );
 
-        // request braodcasts change things.
-        assertTrue( "Receiver count [" + receiver.getCnt() + "] should be the 
at least the number sent [" + cnt + "].",
-                    cnt <= receiver.getCnt() );
-        sender.close();
+            // request braodcasts change things.
+            assertTrue( "Receiver count [" + receiver.getCnt() + "] should be 
the at least the number sent [" + cnt + "].",
+                        cnt <= receiver.getCnt() );
+        }
     }
 }

Reply via email to