CAMEL-7998 Merged the patch into camel-netty
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/62151db1 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/62151db1 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/62151db1 Branch: refs/heads/master Commit: 62151db18bc328dc59b74be4418c866547b28660 Parents: 85a0cd9 Author: Willem Jiang <willem.ji...@gmail.com> Authored: Mon Nov 10 20:43:46 2014 +0800 Committer: Willem Jiang <willem.ji...@gmail.com> Committed: Mon Nov 10 21:11:42 2014 +0800 ---------------------------------------------------------------------- .../component/netty/NettyConfiguration.java | 10 ++ .../camel/component/netty/NettyProducer.java | 32 +++++- .../NettyComponentWithConfigurationTest.java | 27 +++++ .../netty/NettyUdpConnectedSendTest.java | 114 ++++++++++++++++++ .../netty/NettyUdpConnectionlessSendTest.java | 115 +++++++++++++++++++ 5 files changed, 293 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/62151db1/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java index 8caf41e..f3999a7 100644 --- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java +++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java @@ -90,6 +90,8 @@ public class NettyConfiguration extends NettyServerBootstrapConfiguration implem private long producerPoolMinEvictableIdle = 5 * 60 * 1000L; @UriParam private boolean producerPoolEnabled = true; + @UriParam + private boolean udpConnectionlessSending; /** * Returns a copy of this configuration @@ -445,6 +447,14 @@ public class NettyConfiguration extends NettyServerBootstrapConfiguration implem public void setProducerPoolEnabled(boolean producerPoolEnabled) { this.producerPoolEnabled = producerPoolEnabled; } + + public boolean isUdpConnectionlessSending() { + return udpConnectionlessSending; + } + + public void setUdpConnectionlessSending(boolean udpConnectionlessSending) { + this.udpConnectionlessSending = udpConnectionlessSending; + } private static <T> void addToHandlersList(List<T> configured, List<T> handlers, Class<T> handlerType) { if (handlers != null) { http://git-wip-us.apache.org/repos/asf/camel/blob/62151db1/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java index 5856050..8d54f0a 100644 --- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java +++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java @@ -42,6 +42,7 @@ import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelFactory; import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelFutureListener; +import org.jboss.netty.channel.SucceededChannelFuture; import org.jboss.netty.channel.group.ChannelGroup; import org.jboss.netty.channel.group.ChannelGroupFuture; import org.jboss.netty.channel.group.DefaultChannelGroup; @@ -234,8 +235,14 @@ public class NettyProducer extends DefaultAsyncProducer { // setup state as attachment on the channel, so we can access the state later when needed channel.setAttachment(new NettyCamelState(producerCallback, exchange)); + InetSocketAddress remoteAddress = null; + if (!isTcp() && configuration.isUdpConnectionlessSending()) { + // Need to specify the remoteAddress here + remoteAddress = new InetSocketAddress(configuration.getHost(), configuration.getPort()); + } + // write body - NettyHelper.writeBodyAsync(LOG, channel, null, body, exchange, new ChannelFutureListener() { + NettyHelper.writeBodyAsync(LOG, channel, remoteAddress, body, exchange, new ChannelFutureListener() { public void operationComplete(ChannelFuture channelFuture) throws Exception { LOG.trace("Operation complete {}", channelFuture); if (!channelFuture.isSuccess()) { @@ -398,9 +405,18 @@ public class NettyProducer extends DefaultAsyncProducer { connectionlessClientBootstrap.setPipelineFactory(pipelineFactory); // bind and store channel so we can close it when stopping Channel channel = connectionlessClientBootstrap.bind(new InetSocketAddress(0)); + allChannels.add(channel); - answer = connectionlessClientBootstrap.connect(new InetSocketAddress(configuration.getHost(), configuration.getPort())); - + // if udp connectionless sending is true we don't do a connect. + // we just send on the channel created with bind which means + // really fire and forget. You wont get an PortUnreachableException + // if no one is listen on the port + if (!configuration.isUdpConnectionlessSending()) { + answer = connectionlessClientBootstrap.connect(new InetSocketAddress(configuration.getHost(), configuration.getPort())); + } else { + answer = new SucceededChannelFuture(channel); + } + if (LOG.isDebugEnabled()) { LOG.debug("Created new UDP client bootstrap connecting to {}:{} with options: {}", new Object[]{configuration.getHost(), configuration.getPort(), connectionlessClientBootstrap.getOptions()}); @@ -523,8 +539,14 @@ public class NettyProducer extends DefaultAsyncProducer { @Override public boolean validateObject(Channel channel) { - // we need a connected channel to be valid - boolean answer = channel.isConnected(); + boolean answer = false; + if (configuration.isUdpConnectionlessSending()) { + // we don't need check if the channel is connected + answer = channel.isOpen(); + } else { + // we need a connected channel to be valid + answer = channel.isConnected(); + } LOG.trace("Validating channel: {} -> {}", channel, answer); return answer; } http://git-wip-us.apache.org/repos/asf/camel/blob/62151db1/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyComponentWithConfigurationTest.java ---------------------------------------------------------------------- diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyComponentWithConfigurationTest.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyComponentWithConfigurationTest.java index 3f598bc..59d4fd2 100644 --- a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyComponentWithConfigurationTest.java +++ b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyComponentWithConfigurationTest.java @@ -53,5 +53,32 @@ public class NettyComponentWithConfigurationTest extends CamelTestSupport { assertEquals(4455, e1.getConfiguration().getPort()); assertEquals(5566, e2.getConfiguration().getPort()); } + + @Test + public void testNettyComponentUdpWithConfiguration() throws Exception { + NettyComponent comp = context.getComponent("netty", NettyComponent.class); + + NettyConfiguration cfg = new NettyConfiguration(); + + comp.setConfiguration(cfg); + assertSame(cfg, comp.getConfiguration()); + + NettyEndpoint e1 = (NettyEndpoint) comp.createEndpoint("netty://udp://localhost:8601?sync=false"); + NettyEndpoint e2 = (NettyEndpoint) comp.createEndpoint("netty://udp://localhost:8602?sync=false&udpConnectionlessSending=true"); + + // should not be same + assertNotSame(e1, e2); + assertNotSame(e1.getConfiguration(), e2.getConfiguration()); + + // both endpoints are sync=false + assertEquals(false, e1.getConfiguration().isSync()); + assertEquals(false, e2.getConfiguration().isSync()); + // if not set it should be false + assertEquals(false, e1.getConfiguration().isUdpConnectionlessSending()); + assertEquals(true, e2.getConfiguration().isUdpConnectionlessSending()); + + assertEquals(8601, e1.getConfiguration().getPort()); + assertEquals(8602, e2.getConfiguration().getPort()); + } } http://git-wip-us.apache.org/repos/asf/camel/blob/62151db1/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUdpConnectedSendTest.java ---------------------------------------------------------------------- diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUdpConnectedSendTest.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUdpConnectedSendTest.java new file mode 100644 index 0000000..aef3ae2 --- /dev/null +++ b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUdpConnectedSendTest.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.netty; + +import java.net.InetSocketAddress; + +import org.apache.camel.builder.RouteBuilder; +import org.jboss.netty.bootstrap.ConnectionlessBootstrap; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.channel.ChannelPipeline; +import org.jboss.netty.channel.ChannelPipelineFactory; +import org.jboss.netty.channel.Channels; +import org.jboss.netty.channel.MessageEvent; +import org.jboss.netty.channel.SimpleChannelUpstreamHandler; +import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory; +import org.jboss.netty.handler.codec.string.StringDecoder; +import org.jboss.netty.util.CharsetUtil; +import org.junit.Test; + + +public class NettyUdpConnectedSendTest extends BaseNettyTest { + private static final String SEND_STRING = "***<We all love camel>***"; + private static final int SEND_COUNT = 20; + private int receivedCount; + private ConnectionlessBootstrap bootstrap; + + public void createNettyUdpReceiver() { + bootstrap = new ConnectionlessBootstrap(new NioDatagramChannelFactory()); + bootstrap.setPipelineFactory(new ChannelPipelineFactory() { + @Override + public ChannelPipeline getPipeline() throws Exception { + ChannelPipeline channelPipeline = Channels.pipeline(); + channelPipeline.addLast("StringDecoder", new StringDecoder(CharsetUtil.UTF_8)); + channelPipeline.addLast("ContentHandler", new ContentHandler()); + return channelPipeline; + } + }); + + } + + + public void bind() { + bootstrap.bind(new InetSocketAddress(8601)); + } + + public void stop() { + bootstrap.shutdown(); + } + + @Test + public void sendConnectedUdp() throws Exception { + createNettyUdpReceiver(); + Thread t = new Thread(new Runnable() { + @Override + public void run() { + bind(); + } + }); + t.start(); + Thread.sleep(1000); + for (int i = 0; i < SEND_COUNT; ++i) { + template.sendBody("direct:in", SEND_STRING); + } + Thread.sleep(1000); + stop(); + assertTrue("We should have received some datagrams", receivedCount > 0); + } + + @Test + public void sendConnectedWithoutReceiver() throws Exception { + int exceptionCount = 0; + for (int i = 0; i < SEND_COUNT; ++i) { + try { + template.sendBody("direct:in", SEND_STRING); + } catch (Exception ex) { + ++exceptionCount; + } + } + assertTrue("There should at least one exception because port is unreachable", exceptionCount > 0); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:in").to("netty:udp://localhost:8601?sync=false&textline=true"); + } + }; + } + + public class ContentHandler extends SimpleChannelUpstreamHandler { + @Override + public void messageReceived(ChannelHandlerContext ctx, MessageEvent messageEvent) throws Exception { + String s = (String)messageEvent.getMessage(); + receivedCount++; + assertEquals(SEND_STRING, s.trim()); + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/62151db1/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUdpConnectionlessSendTest.java ---------------------------------------------------------------------- diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUdpConnectionlessSendTest.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUdpConnectionlessSendTest.java new file mode 100644 index 0000000..bd36e4a --- /dev/null +++ b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUdpConnectionlessSendTest.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.netty; + +import java.net.InetSocketAddress; + +import org.apache.camel.builder.RouteBuilder; +import org.jboss.netty.bootstrap.ConnectionlessBootstrap; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.channel.ChannelPipeline; +import org.jboss.netty.channel.ChannelPipelineFactory; +import org.jboss.netty.channel.Channels; +import org.jboss.netty.channel.MessageEvent; +import org.jboss.netty.channel.SimpleChannelUpstreamHandler; +import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory; +import org.jboss.netty.handler.codec.string.StringDecoder; +import org.jboss.netty.util.CharsetUtil; +import org.junit.Test; + + +public class NettyUdpConnectionlessSendTest extends BaseNettyTest { + private static final String SEND_STRING = "***<We all love camel>***"; + private static final int SEND_COUNT = 20; + private int receivedCount; + private ConnectionlessBootstrap bootstrap; + + public void createNettyUdpReceiver() { + bootstrap = new ConnectionlessBootstrap(new NioDatagramChannelFactory()); + bootstrap.setPipelineFactory(new ChannelPipelineFactory() { + @Override + public ChannelPipeline getPipeline() throws Exception { + ChannelPipeline channelPipeline = Channels.pipeline(); + channelPipeline.addLast("StringDecoder", new StringDecoder(CharsetUtil.UTF_8)); + channelPipeline.addLast("ContentHandler", new ContentHandler()); + return channelPipeline; + } + }); + + } + + + public void bind() { + bootstrap.bind(new InetSocketAddress(8601)); + } + + public void stop() { + bootstrap.shutdown(); + } + + @Test + public void sendConnectionlessUdp() throws Exception { + createNettyUdpReceiver(); + Thread t = new Thread(new Runnable() { + @Override + public void run() { + bind(); + } + }); + t.start(); + Thread.sleep(1000); + for (int i = 0; i < SEND_COUNT; ++i) { + template.sendBody("direct:in", SEND_STRING); + } + Thread.sleep(1000); + stop(); + assertTrue("We should have received some datagrams", receivedCount > 0); + + } + + @Test + public void sendWithoutReceiver() throws Exception { + int exceptionCount = 0; + for (int i = 0; i < SEND_COUNT; ++i) { + try { + template.sendBody("direct:in", SEND_STRING); + } catch (Exception ex) { + ++exceptionCount; + } + } + assertEquals("No exception should occur", 0, exceptionCount); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:in").to("netty:udp://localhost:8601?sync=false&textline=true&udpConnectionlessSending=true"); + } + }; + } + + public class ContentHandler extends SimpleChannelUpstreamHandler { + @Override + public void messageReceived(ChannelHandlerContext ctx, MessageEvent messageEvent) throws Exception { + String s = (String)messageEvent.getMessage(); + receivedCount++; + assertEquals(SEND_STRING, s.trim()); + } + } +}