This is an automated email from the ASF dual-hosted git repository. tv pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/commons-jcs.git
commit 727172b3c84da21ade3e7118080365db87b1c3ea Author: Thomas Vandahl <[email protected]> AuthorDate: Sat Mar 27 17:46:16 2021 +0100 Use NIO and Selector, make IPv6 multicast work --- .../jcs3/utils/discovery/UDPDiscoveryReceiver.java | 213 +++++++++++++-------- .../jcs3/utils/discovery/UDPDiscoverySender.java | 46 ++++- .../jcs3/utils/discovery/UDPDiscoveryService.java | 20 +- .../discovery/UDPDiscoverySenderUnitTest.java | 7 +- .../discovery/UDPDiscoveryServiceUnitTest.java | 78 ++------ .../jcs3/utils/discovery/UDPDiscoveryUnitTest.java | 84 ++++---- 6 files changed, 259 insertions(+), 189 deletions(-) diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryReceiver.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryReceiver.java index 66a5754..b1b31c5 100644 --- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryReceiver.java +++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryReceiver.java @@ -20,15 +20,25 @@ package org.apache.commons.jcs3.utils.discovery; */ import java.io.IOException; -import java.net.DatagramPacket; +import java.net.Inet6Address; import java.net.InetAddress; -import java.net.MulticastSocket; +import java.net.InetSocketAddress; import java.net.NetworkInterface; +import java.net.StandardProtocolFamily; +import java.net.StandardSocketOptions; +import java.nio.ByteBuffer; +import java.nio.channels.DatagramChannel; +import java.nio.channels.MembershipKey; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.util.Iterator; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.jcs3.engine.CacheInfo; +import org.apache.commons.jcs3.engine.behavior.IElementSerializer; import org.apache.commons.jcs3.engine.behavior.IShutdownObserver; import org.apache.commons.jcs3.log.Log; import org.apache.commons.jcs3.log.LogManager; @@ -44,8 +54,14 @@ public class UDPDiscoveryReceiver /** The log factory */ private static final Log log = LogManager.getLog( UDPDiscoveryReceiver.class ); - /** The socket used for communication. */ - private MulticastSocket mSocket; + /** The channel used for communication. */ + private DatagramChannel multicastChannel; + + /** The group membership key. */ + private MembershipKey multicastGroupKey; + + /** The selector. */ + private Selector selector; /** * TODO: Consider using the threadpool manager to get this thread pool. For now place a tight @@ -62,8 +78,8 @@ public class UDPDiscoveryReceiver /** Service to get cache names and handle request broadcasts */ private final UDPDiscoveryService service; - /** Multicast address */ - private final InetAddress multicastAddress; + /** Serializer */ + private IElementSerializer serializer; /** Is it shutdown. */ private AtomicBoolean shutdown = new AtomicBoolean(false); @@ -84,7 +100,11 @@ public class UDPDiscoveryReceiver throws IOException { this.service = service; - this.multicastAddress = InetAddress.getByName( multicastAddressString ); + if (service != null) + { + this.serializer = service.getSerializer(); + } + InetAddress multicastAddress = InetAddress.getByName( multicastAddressString ); // create a small thread pool to handle a barrage this.pooledExecutor = ThreadPoolManager.getInstance().createPool( @@ -93,7 +113,6 @@ public class UDPDiscoveryReceiver "JCS-UDPDiscoveryReceiver-", Thread.MIN_PRIORITY); log.info( "Constructing listener, [{0}:{1}]", multicastAddress, multicastPort ); - createSocket( multicastInterfaceString, multicastAddress, multicastPort ); } @@ -111,12 +130,6 @@ public class UDPDiscoveryReceiver { try { - mSocket = new MulticastSocket( multicastPort ); - if (log.isInfoEnabled()) - { - log.info( "Joining Group: [{0}]", multicastAddress ); - } - // Use dedicated interface if specified NetworkInterface multicastInterface = null; if (multicastInterfaceString != null) @@ -127,13 +140,20 @@ public class UDPDiscoveryReceiver { multicastInterface = HostNameUtil.getMulticastNetworkInterface(); } - if (multicastInterface != null) - { - log.info("Using network interface {0}", multicastInterface.getDisplayName()); - mSocket.setNetworkInterface(multicastInterface); - } - mSocket.joinGroup( multicastAddress ); + multicastChannel = DatagramChannel.open( + multicastAddress instanceof Inet6Address ? + StandardProtocolFamily.INET6 : StandardProtocolFamily.INET) + .setOption(StandardSocketOptions.SO_REUSEADDR, true) + .setOption(StandardSocketOptions.IP_MULTICAST_IF, multicastInterface) + .bind(new InetSocketAddress(multicastPort)); + multicastChannel.configureBlocking(false); + + log.info("Joining Group: [{0}] on {1}", multicastAddress, multicastInterface); + multicastGroupKey = multicastChannel.join(multicastAddress, multicastInterface); + + selector = Selector.open(); + multicastChannel.register(selector, SelectionKey.OP_READ); } catch ( final IOException e ) { @@ -143,97 +163,120 @@ public class UDPDiscoveryReceiver } } + private final ArrayBlockingQueue<UDPDiscoveryMessage> msgQueue = + new ArrayBlockingQueue<>(maxPoolSize); + /** - * Highly unreliable. If it is processing one message while another comes in, the second - * message is lost. This is for low concurrency peppering. - * <p> + * Wait for multicast message + * * @return the object message * @throws IOException + * @deprecated no longer used */ + @Deprecated public Object waitForMessage() throws IOException { - final byte[] mBuffer = new byte[65536]; - final DatagramPacket packet = new DatagramPacket(mBuffer, mBuffer.length); - Object obj = null; try { - log.debug( "Waiting for message." ); - - mSocket.receive( packet ); - - log.debug( "Received packet from address [{0}]", - () -> packet.getSocketAddress() ); - - obj = service.getSerializer().deSerialize(mBuffer, null); - - if ( obj instanceof UDPDiscoveryMessage ) - { - // Ensure that the address we're supposed to send to is, indeed, the address - // of the machine on the other end of this connection. This guards against - // instances where we don't exactly get the right local host address - final UDPDiscoveryMessage msg = (UDPDiscoveryMessage) obj; - msg.setHost(packet.getAddress().getHostAddress()); - - log.debug( "Read object from address [{0}], object=[{1}]", - packet.getSocketAddress(), obj ); - } + return msgQueue.take(); } - catch ( final IOException | ClassNotFoundException e ) + catch (InterruptedException e) { - log.error( "Error receiving multicast packet", e ); + throw new IOException("Interrupted waiting for message", e); } - - return obj; } - /** Main processing method for the LateralUDPReceiver object */ + /** Main processing method for the UDPDiscoveryReceiver object */ @Override public void run() { try { + ByteBuffer byteBuffer = ByteBuffer.allocate(65536); + log.debug( "Waiting for message." ); + while (!shutdown.get()) { - final Object obj = waitForMessage(); + int activeKeys = selector.select(); + if (activeKeys == 0) + { + continue; + } - cnt.incrementAndGet(); + for (Iterator<SelectionKey> i = selector.selectedKeys().iterator(); i.hasNext();) + { + if (shutdown.get()) + { + break; + } - log.debug( "{0} messages received.", this::getCnt ); + SelectionKey key = i.next(); - try - { - UDPDiscoveryMessage message = (UDPDiscoveryMessage) obj; - // check for null - if ( message != null ) + if (!key.isValid()) { - pooledExecutor.execute(() -> handleMessage(message)); - log.debug( "Passed handler to executor." ); + continue; } - else + + if (key.isReadable()) { - log.warn( "message is null" ); + cnt.incrementAndGet(); + log.debug( "{0} messages received.", this::getCnt ); + + DatagramChannel mc = (DatagramChannel) key.channel(); + + byteBuffer.clear(); + InetSocketAddress sourceAddress = + (InetSocketAddress) mc.receive(byteBuffer); + byteBuffer.flip(); + + try + { + log.debug("Received packet from address [{0}]", sourceAddress); + + Object obj = serializer.deSerialize(byteBuffer.array(), null); + + if (obj instanceof UDPDiscoveryMessage) + { + // Ensure that the address we're supposed to send to is, indeed, the address + // of the machine on the other end of this connection. This guards against + // instances where we don't exactly get the right local host address + final UDPDiscoveryMessage msg = (UDPDiscoveryMessage) obj; + msg.setHost(sourceAddress.getHostString()); + + log.debug( "Read object from address [{0}], object=[{1}]", + sourceAddress, obj ); + + // Just to keep the functionality of the deprecated waitForMessage method + synchronized (msgQueue) + { + // Check if queue full already? + if (msgQueue.size() == maxPoolSize) + { + // remove oldest element from queue + msgQueue.remove(); + } + + msgQueue.add(msg); + } + + pooledExecutor.execute(() -> handleMessage(msg)); + log.debug( "Passed handler to executor." ); + } + } + catch ( final IOException | ClassNotFoundException e ) + { + log.error( "Error receiving multicast packet", e ); + } + + i.remove(); } } - catch ( final ClassCastException cce ) - { - log.warn( "Received unknown message type", cce.getMessage() ); - } } // end while } catch ( final IOException e ) { log.error( "Unexpected exception in UDP receiver.", e ); - try - { - Thread.sleep( 100 ); - // TODO consider some failure count so we don't do this - // forever. - } - catch ( final InterruptedException e2 ) - { - log.error( "Problem sleeping", e2 ); - } } } @@ -254,6 +297,16 @@ public class UDPDiscoveryReceiver } /** + * For testing + * + * @param serializer the serializer to set + */ + protected void setSerializer(IElementSerializer serializer) + { + this.serializer = serializer; + } + + /** * Separate thread run when a command comes into the UDPDiscoveryReceiver. * @deprectaed No longer used */ @@ -280,7 +333,6 @@ public class UDPDiscoveryReceiver { handleMessage(message); } - } /** @@ -343,8 +395,9 @@ public class UDPDiscoveryReceiver { try { - mSocket.leaveGroup( multicastAddress ); - mSocket.close(); + selector.close(); + multicastGroupKey.drop(); + multicastChannel.close(); } catch ( final IOException e ) { diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoverySender.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoverySender.java index b35be3c..97ab1bd 100644 --- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoverySender.java +++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoverySender.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.net.DatagramPacket; import java.net.InetAddress; import java.net.MulticastSocket; +import java.net.NetworkInterface; import java.util.ArrayList; import org.apache.commons.jcs3.engine.CacheInfo; @@ -31,6 +32,7 @@ import org.apache.commons.jcs3.engine.behavior.IElementSerializer; import org.apache.commons.jcs3.log.Log; import org.apache.commons.jcs3.log.LogManager; import org.apache.commons.jcs3.utils.discovery.UDPDiscoveryMessage.BroadcastType; +import org.apache.commons.jcs3.utils.net.HostNameUtil; import org.apache.commons.jcs3.utils.serialization.StandardSerializer; /** @@ -72,7 +74,7 @@ public class UDPDiscoverySender implements AutoCloseable public UDPDiscoverySender( final String host, final int port, final int udpTTL ) throws IOException { - this(host, port, udpTTL, new StandardSerializer()); + this(null, host, port, udpTTL, new StandardSerializer()); } /** @@ -82,19 +84,43 @@ public class UDPDiscoverySender implements AutoCloseable * <p> * When you are done sending, you should destroy the socket sender. * <p> + * @param udpDiscoveryAttributes configuration object + * @param serializer the Serializer to use when sending messages + * @throws IOException + */ + public UDPDiscoverySender(final UDPDiscoveryAttributes udpDiscoveryAttributes, final IElementSerializer serializer) + throws IOException + { + this(udpDiscoveryAttributes.getUdpDiscoveryInterface(), + udpDiscoveryAttributes.getUdpDiscoveryAddr(), + udpDiscoveryAttributes.getUdpDiscoveryPort(), + udpDiscoveryAttributes.getUdpTTL(), + serializer); + } + + /** + * Constructor for the UDPDiscoverySender object + * <p> + * This sender can be used to send multiple messages. + * <p> + * When you are done sending, you should destroy the socket sender. + * <p> + * @param mcastInterface the Multicast interface name to use, if null, try to autodetect * @param host * @param port * @param udpTTL the Datagram packet time-to-live * @param serializer the Serializer to use when sending messages * @throws IOException */ - public UDPDiscoverySender( final String host, final int port, final int udpTTL, IElementSerializer serializer) + public UDPDiscoverySender(final String mcastInterface, final String host, + final int port, final int udpTTL, IElementSerializer serializer) throws IOException { try { log.debug( "Constructing socket for sender on port [{0}]", port ); localSocket = new MulticastSocket( port ); + if (udpTTL > 0) { log.debug( "Setting datagram TTL to [{0}]", udpTTL ); @@ -103,6 +129,22 @@ public class UDPDiscoverySender implements AutoCloseable // Remote address. multicastAddress = InetAddress.getByName( host ); + + // Use dedicated interface if specified + NetworkInterface multicastInterface = null; + if (mcastInterface != null) + { + multicastInterface = NetworkInterface.getByName(mcastInterface); + } + else + { + multicastInterface = HostNameUtil.getMulticastNetworkInterface(); + } + if (multicastInterface != null) + { + log.info("Using network interface {0}", multicastInterface.getDisplayName()); + localSocket.setNetworkInterface(multicastInterface); + } } catch ( final IOException e ) { diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryService.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryService.java index 8d08463..bb7b9e5 100644 --- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryService.java +++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryService.java @@ -106,7 +106,8 @@ public class UDPDiscoveryService */ public UDPDiscoveryService(final UDPDiscoveryAttributes attributes, IElementSerializer serializer) { - udpDiscoveryAttributes = attributes.clone(); + this.udpDiscoveryAttributes = attributes.clone(); + this.serializer = serializer; try { @@ -134,8 +135,6 @@ public class UDPDiscoveryService getUdpDiscoveryAttributes().getUdpDiscoveryPort(), e ); } - this.serializer = serializer; - // initiate sender broadcast initiateBroadcast(); } @@ -197,10 +196,7 @@ public class UDPDiscoveryService () -> getUdpDiscoveryAttributes().getServicePort() ); try (UDPDiscoverySender sender = new UDPDiscoverySender( - getUdpDiscoveryAttributes().getUdpDiscoveryAddr(), - getUdpDiscoveryAttributes().getUdpDiscoveryPort(), - getUdpDiscoveryAttributes().getUdpTTL(), - getSerializer())) + getUdpDiscoveryAttributes(), getSerializer())) { sender.requestBroadcast(); @@ -223,10 +219,7 @@ public class UDPDiscoveryService // create this connection each time. // more robust try (UDPDiscoverySender sender = new UDPDiscoverySender( - getUdpDiscoveryAttributes().getUdpDiscoveryAddr(), - getUdpDiscoveryAttributes().getUdpDiscoveryPort(), - getUdpDiscoveryAttributes().getUdpTTL(), - getSerializer())) + getUdpDiscoveryAttributes(), getSerializer())) { sender.passiveBroadcast( getUdpDiscoveryAttributes().getServiceAddress(), @@ -252,10 +245,7 @@ public class UDPDiscoveryService // create this connection each time. // more robust try (UDPDiscoverySender sender = new UDPDiscoverySender( - getUdpDiscoveryAttributes().getUdpDiscoveryAddr(), - getUdpDiscoveryAttributes().getUdpDiscoveryPort(), - getUdpDiscoveryAttributes().getUdpTTL(), - getSerializer())) + getUdpDiscoveryAttributes(), getSerializer())) { sender.removeBroadcast( getUdpDiscoveryAttributes().getServiceAddress(), diff --git a/commons-jcs-core/src/test/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoverySenderUnitTest.java b/commons-jcs-core/src/test/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoverySenderUnitTest.java index 27d60d1..4e403f6 100644 --- a/commons-jcs-core/src/test/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoverySenderUnitTest.java +++ b/commons-jcs-core/src/test/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoverySenderUnitTest.java @@ -61,8 +61,13 @@ public class UDPDiscoverySenderUnitTest throws Exception { super.setUp(); + receiver = new UDPDiscoveryReceiver( null, null, ADDRESS, PORT ); - sender = new UDPDiscoverySender( ADDRESS, PORT, 0, new StandardSerializer() ); + receiver.setSerializer(new StandardSerializer()); + final Thread t = new Thread( receiver ); + t.start(); + + sender = new UDPDiscoverySender(null, ADDRESS, PORT, 1, new StandardSerializer()); } /** diff --git a/commons-jcs-core/src/test/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryServiceUnitTest.java b/commons-jcs-core/src/test/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryServiceUnitTest.java index c0c40c7..1c17ddb 100644 --- a/commons-jcs-core/src/test/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryServiceUnitTest.java +++ b/commons-jcs-core/src/test/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryServiceUnitTest.java @@ -29,25 +29,35 @@ import junit.framework.TestCase; public class UDPDiscoveryServiceUnitTest extends TestCase { - /** Verify that the list is updated. */ - public void testAddOrUpdateService_NotInList() + private final static String host = "228.5.6.7"; + private final static int port = 6789; + + private UDPDiscoveryService service; + private MockDiscoveryListener discoveryListener; + + @Override + protected void setUp() throws Exception { + super.setUp(); + // SETUP - final String host = "228.5.6.7"; - final int port = 6789; final UDPDiscoveryAttributes attributes = new UDPDiscoveryAttributes(); attributes.setUdpDiscoveryAddr( host ); attributes.setUdpDiscoveryPort( port ); attributes.setServicePort( 1000 ); // create the service - final UDPDiscoveryService service = new UDPDiscoveryService(attributes, new StandardSerializer()); + service = new UDPDiscoveryService(attributes, new StandardSerializer()); service.startup(); service.addParticipatingCacheName( "testCache1" ); - final MockDiscoveryListener discoveryListener = new MockDiscoveryListener(); + discoveryListener = new MockDiscoveryListener(); service.addDiscoveryListener( discoveryListener ); + } + /** Verify that the list is updated. */ + public void testAddOrUpdateService_NotInList() + { final DiscoveredService discoveredService = new DiscoveredService(); discoveredService.setServiceAddress( host ); discoveredService.setCacheNames( new ArrayList<>() ); @@ -67,35 +77,19 @@ public class UDPDiscoveryServiceUnitTest /** Verify that the list is updated. */ public void testAddOrUpdateService_InList_NamesDoNotChange() { - // SETUP - final String host = "228.5.6.7"; - final int port = 6789; - final UDPDiscoveryAttributes attributes = new UDPDiscoveryAttributes(); - attributes.setUdpDiscoveryAddr( host ); - attributes.setUdpDiscoveryPort( port ); - attributes.setServicePort( 1000 ); - - // create the service - final UDPDiscoveryService service = new UDPDiscoveryService(attributes, new StandardSerializer()); - service.startup(); - service.addParticipatingCacheName( "testCache1" ); - - final MockDiscoveryListener discoveryListener = new MockDiscoveryListener(); - service.addDiscoveryListener( discoveryListener ); - - final ArrayList<String> sametCacheNames = new ArrayList<>(); - sametCacheNames.add( "name1" ); + final ArrayList<String> sameCacheNames = new ArrayList<>(); + sameCacheNames.add( "name1" ); final DiscoveredService discoveredService = new DiscoveredService(); discoveredService.setServiceAddress( host ); - discoveredService.setCacheNames( sametCacheNames ); + discoveredService.setCacheNames( sameCacheNames ); discoveredService.setServicePort( 1000 ); discoveredService.setLastHearFromTime( 100 ); final DiscoveredService discoveredService2 = new DiscoveredService(); discoveredService2.setServiceAddress( host ); - discoveredService2.setCacheNames( sametCacheNames ); + discoveredService2.setCacheNames( sameCacheNames ); discoveredService2.setServicePort( 1000 ); discoveredService2.setLastHearFromTime( 500 ); @@ -130,22 +124,6 @@ public class UDPDiscoveryServiceUnitTest /** Verify that the list is updated. */ public void testAddOrUpdateService_InList_NamesChange() { - // SETUP - final String host = "228.5.6.7"; - final int port = 6789; - final UDPDiscoveryAttributes attributes = new UDPDiscoveryAttributes(); - attributes.setUdpDiscoveryAddr( host ); - attributes.setUdpDiscoveryPort( port ); - attributes.setServicePort( 1000 ); - - // create the service - final UDPDiscoveryService service = new UDPDiscoveryService(attributes, new StandardSerializer()); - service.startup(); - service.addParticipatingCacheName( "testCache1" ); - - final MockDiscoveryListener discoveryListener = new MockDiscoveryListener(); - service.addDiscoveryListener( discoveryListener ); - final DiscoveredService discoveredService = new DiscoveredService(); discoveredService.setServiceAddress( host ); discoveredService.setCacheNames( new ArrayList<>() ); @@ -192,22 +170,6 @@ public class UDPDiscoveryServiceUnitTest /** Verify that the list is updated. */ public void testRemoveDiscoveredService() { - // SETUP - final String host = "228.5.6.7"; - final int port = 6789; - final UDPDiscoveryAttributes attributes = new UDPDiscoveryAttributes(); - attributes.setUdpDiscoveryAddr( host ); - attributes.setUdpDiscoveryPort( port ); - attributes.setServicePort( 1000 ); - - // create the service - final UDPDiscoveryService service = new UDPDiscoveryService(attributes, new StandardSerializer()); - service.startup(); - service.addParticipatingCacheName( "testCache1" ); - - final MockDiscoveryListener discoveryListener = new MockDiscoveryListener(); - service.addDiscoveryListener( discoveryListener ); - final DiscoveredService discoveredService = new DiscoveredService(); discoveredService.setServiceAddress( host ); discoveredService.setCacheNames( new ArrayList<>() ); diff --git a/commons-jcs-core/src/test/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryUnitTest.java b/commons-jcs-core/src/test/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryUnitTest.java index 92336c3..719aa7d 100644 --- a/commons-jcs-core/src/test/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryUnitTest.java +++ b/commons-jcs-core/src/test/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryUnitTest.java @@ -36,13 +36,34 @@ public class UDPDiscoveryUnitTest * <p> * @throws Exception */ - public void testSimpleUDPDiscovery() + public void testSimpleUDPDiscoveryIPv4() + throws Exception + { + simpleUDPDiscovery("228.5.6.7"); + } + + /** + * <p> + * @throws Exception + */ + public void testSimpleUDPDiscoveryIPv6() + throws Exception + { + simpleUDPDiscovery("FF02::5678"); + } + + /** + * <p> + * @throws Exception + */ + private void simpleUDPDiscovery(String discoveryAddress) throws Exception { final UDPDiscoveryAttributes attributes = new UDPDiscoveryAttributes(); - attributes.setUdpDiscoveryAddr( /*"FF7E:230::1234"*/ "228.5.6.7" ); - attributes.setUdpDiscoveryPort( 6789 ); - attributes.setServicePort( 1000 ); + attributes.setUdpDiscoveryAddr(discoveryAddress); + attributes.setUdpDiscoveryPort(6789); + attributes.setServicePort(1000); + attributes.setUdpTTL(4); /* datagram TTL */ // create the service final UDPDiscoveryService service = new UDPDiscoveryService(attributes, new StandardSerializer()); @@ -61,39 +82,36 @@ public class UDPDiscoveryUnitTest t.start(); // create a sender - final UDPDiscoverySender sender = new UDPDiscoverySender( - attributes.getUdpDiscoveryAddr(), - attributes.getUdpDiscoveryPort(), - 4, /* datagram TTL */ - service.getSerializer()); - - // create more names than we have no wait facades for - // the only one that gets added should be testCache1 - final ArrayList<String> cacheNames = new ArrayList<>(); - final int numJunk = 10; - for ( int i = 0; i < numJunk; i++ ) + try (final UDPDiscoverySender sender = new UDPDiscoverySender( + attributes, service.getSerializer())) { - cacheNames.add( "junkCacheName" + i ); - } - cacheNames.add( "testCache1" ); + // create more names than we have no wait facades for + // the only one that gets added should be testCache1 + final ArrayList<String> cacheNames = new ArrayList<>(); + final int numJunk = 10; + for ( int i = 0; i < numJunk; i++ ) + { + cacheNames.add( "junkCacheName" + i ); + } + cacheNames.add( "testCache1" ); - // send max messages - final int max = 10; - int cnt = 0; - for ( ; cnt < max; cnt++ ) - { - sender.passiveBroadcast( "localhost", 1111, cacheNames, 1 ); - SleepUtil.sleepAtLeast( 20 ); - } + // send max messages + final int max = 10; + int cnt = 0; + for ( ; cnt < max; cnt++ ) + { + sender.passiveBroadcast( "localhost", 1111, cacheNames, 1 ); + SleepUtil.sleepAtLeast( 20 ); + } - SleepUtil.sleepAtLeast( 200 ); + SleepUtil.sleepAtLeast( 200 ); - // check to see that we got 10 messages - //System.out.println( "Receiver count = " + receiver.getCnt() ); + // check to see that we got 10 messages + //System.out.println( "Receiver count = " + receiver.getCnt() ); - // request braodcasts change things. - assertTrue( "Receiver count [" + receiver.getCnt() + "] should be the at least the number sent [" + cnt + "].", - cnt <= receiver.getCnt() ); - sender.close(); + // request braodcasts change things. + assertTrue( "Receiver count [" + receiver.getCnt() + "] should be the at least the number sent [" + cnt + "].", + cnt <= receiver.getCnt() ); + } } }
