Updated Branches: refs/heads/master f3509ddc8 -> 8c90678d5
CAMEL-6563: camel-netty to join UDP multicast. Thanks to Sam Patel for the patch. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8c90678d Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8c90678d Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8c90678d Branch: refs/heads/master Commit: 8c90678d56db8ab75a56b75d7350042b5a04dafc Parents: f3509dd Author: Claus Ibsen <davscl...@apache.org> Authored: Mon Jul 22 11:09:00 2013 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Mon Jul 22 11:09:00 2013 +0200 ---------------------------------------------------------------------- .../NettyServerBootstrapConfiguration.java | 12 ++++++++ .../SingleUDPNettyServerBootstrapFactory.java | 31 +++++++++++++++----- 2 files changed, 36 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/8c90678d/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyServerBootstrapConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyServerBootstrapConfiguration.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyServerBootstrapConfiguration.java index e7972fb..666415f 100644 --- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyServerBootstrapConfiguration.java +++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyServerBootstrapConfiguration.java @@ -58,6 +58,7 @@ public class NettyServerBootstrapConfiguration implements Cloneable { protected String passphrase; protected BossPool bossPool; protected WorkerPool workerPool; + protected String networkInterface; public String getAddress() { return host + ":" + port; @@ -319,6 +320,14 @@ public class NettyServerBootstrapConfiguration implements Cloneable { this.workerPool = workerPool; } + public String getNetworkInterface() { + return networkInterface; + } + + public void setNetworkInterface(String networkInterface) { + this.networkInterface = networkInterface; + } + /** * Checks if the other {@link NettyServerBootstrapConfiguration} is compatible * with this, as a Netty listener bound on port X shares the same common @@ -396,6 +405,8 @@ public class NettyServerBootstrapConfiguration implements Cloneable { isCompatible = false; } else if (workerPool != other.workerPool) { isCompatible = false; + } else if (networkInterface != null && !networkInterface.equals(other.networkInterface)) { + isCompatible = false; } return isCompatible; @@ -433,6 +444,7 @@ public class NettyServerBootstrapConfiguration implements Cloneable { + ", passphrase='" + passphrase + '\'' + ", bossPool=" + bossPool + ", workerPool=" + workerPool + + ", networkInterface='" + networkInterface + '\'' + '}'; } } http://git-wip-us.apache.org/repos/asf/camel/blob/8c90678d/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleUDPNettyServerBootstrapFactory.java ---------------------------------------------------------------------- diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleUDPNettyServerBootstrapFactory.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleUDPNettyServerBootstrapFactory.java index 5c59166..d73f67a 100644 --- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleUDPNettyServerBootstrapFactory.java +++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleUDPNettyServerBootstrapFactory.java @@ -17,6 +17,9 @@ package org.apache.camel.component.netty; import java.net.InetSocketAddress; +import java.net.NetworkInterface; +import java.net.SocketException; +import java.net.UnknownHostException; import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; @@ -30,10 +33,12 @@ import org.jboss.netty.channel.FixedReceiveBufferSizePredictorFactory; import org.jboss.netty.channel.group.ChannelGroup; import org.jboss.netty.channel.group.ChannelGroupFuture; import org.jboss.netty.channel.group.DefaultChannelGroup; +import org.jboss.netty.channel.socket.DatagramChannel; import org.jboss.netty.channel.socket.DatagramChannelFactory; import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory; import org.jboss.netty.channel.socket.nio.NioDatagramWorkerPool; import org.jboss.netty.channel.socket.nio.WorkerPool; +import org.jboss.netty.handler.ipfilter.IpV4Subnet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,6 +48,8 @@ import org.slf4j.LoggerFactory; public class SingleUDPNettyServerBootstrapFactory extends ServiceSupport implements NettyServerBootstrapFactory { protected static final Logger LOG = LoggerFactory.getLogger(SingleUDPNettyServerBootstrapFactory.class); + private static final String LOOPBACK_INTERFACE = "lo"; + private static final String MULTICAST_SUBNET = "224.0.0.0/4"; private final ChannelGroup allChannels; private CamelContext camelContext; private ThreadFactory threadFactory; @@ -50,7 +57,6 @@ public class SingleUDPNettyServerBootstrapFactory extends ServiceSupport impleme private ChannelPipelineFactory pipelineFactory; private DatagramChannelFactory datagramChannelFactory; private ConnectionlessBootstrap connectionlessBootstrap; - private Channel channel; private WorkerPool workerPool; public SingleUDPNettyServerBootstrapFactory() { @@ -98,7 +104,7 @@ public class SingleUDPNettyServerBootstrapFactory extends ServiceSupport impleme stopServerBootstrap(); } - protected void startServerBootstrap() { + protected void startServerBootstrap() throws UnknownHostException, SocketException { // create non-shared worker pool int count = configuration.getWorkerCount() > 0 ? configuration.getWorkerCount() : NettyHelper.DEFAULT_IO_THREADS; workerPool = new NioDatagramWorkerPool(Executors.newCachedThreadPool(), count); @@ -135,15 +141,26 @@ public class SingleUDPNettyServerBootstrapFactory extends ServiceSupport impleme // set the pipeline factory, which creates the pipeline for each newly created channels connectionlessBootstrap.setPipelineFactory(pipelineFactory); - LOG.info("ConnectionlessBootstrap binding to {}:{}", configuration.getHost(), configuration.getPort()); - channel = connectionlessBootstrap.bind(new InetSocketAddress(configuration.getHost(), configuration.getPort())); - // to keep track of all channels in use - allChannels.add(channel); + InetSocketAddress hostAddress = new InetSocketAddress(configuration.getHost(), configuration.getPort()); + IpV4Subnet multicastSubnet = new IpV4Subnet(MULTICAST_SUBNET); + + if (multicastSubnet.contains(configuration.getHost())) { + DatagramChannel channel = (DatagramChannel)connectionlessBootstrap.bind(hostAddress); + String networkInterface = configuration.getNetworkInterface() == null ? LOOPBACK_INTERFACE : configuration.getNetworkInterface(); + NetworkInterface multicastNetworkInterface = NetworkInterface.getByName(networkInterface); + LOG.info("ConnectionlessBootstrap joining {}:{} using network interface: {}", new Object[]{configuration.getHost(), configuration.getPort(), multicastNetworkInterface.getName()}); + channel.joinGroup(hostAddress, multicastNetworkInterface); + allChannels.add(channel); + } else { + LOG.info("ConnectionlessBootstrap binding to {}:{}", configuration.getHost(), configuration.getPort()); + Channel channel = connectionlessBootstrap.bind(hostAddress); + allChannels.add(channel); + } } protected void stopServerBootstrap() { // close all channels - LOG.info("ConnectionlessBootstrap unbinding from {}:{}", configuration.getHost(), configuration.getPort()); + LOG.info("ConnectionlessBootstrap disconnecting from {}:{}", configuration.getHost(), configuration.getPort()); LOG.trace("Closing {} channels", allChannels.size()); ChannelGroupFuture future = allChannels.close();