CAMEL-7998 Merged the patch into camel-netty

Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/62151db1
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/62151db1
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/62151db1

Branch: refs/heads/master
Commit: 62151db18bc328dc59b74be4418c866547b28660
Parents: 85a0cd9
Author: Willem Jiang <willem.ji...@gmail.com>
Authored: Mon Nov 10 20:43:46 2014 +0800
Committer: Willem Jiang <willem.ji...@gmail.com>
Committed: Mon Nov 10 21:11:42 2014 +0800

----------------------------------------------------------------------
 .../component/netty/NettyConfiguration.java     |  10 ++
 .../camel/component/netty/NettyProducer.java    |  32 +++++-
 .../NettyComponentWithConfigurationTest.java    |  27 +++++
 .../netty/NettyUdpConnectedSendTest.java        | 114 ++++++++++++++++++
 .../netty/NettyUdpConnectionlessSendTest.java   | 115 +++++++++++++++++++
 5 files changed, 293 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/62151db1/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
----------------------------------------------------------------------
diff --git 
a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
 
b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
index 8caf41e..f3999a7 100644
--- 
a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
+++ 
b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
@@ -90,6 +90,8 @@ public class NettyConfiguration extends 
NettyServerBootstrapConfiguration implem
     private long producerPoolMinEvictableIdle = 5 * 60 * 1000L;
     @UriParam
     private boolean producerPoolEnabled = true;
+    @UriParam
+    private boolean udpConnectionlessSending;
 
     /**
      * Returns a copy of this configuration
@@ -445,6 +447,14 @@ public class NettyConfiguration extends 
NettyServerBootstrapConfiguration implem
     public void setProducerPoolEnabled(boolean producerPoolEnabled) {
         this.producerPoolEnabled = producerPoolEnabled;
     }
+    
+    public boolean isUdpConnectionlessSending() {
+        return udpConnectionlessSending;
+    }
+
+    public void setUdpConnectionlessSending(boolean udpConnectionlessSending) {
+        this.udpConnectionlessSending = udpConnectionlessSending;
+    }
 
     private static <T> void addToHandlersList(List<T> configured, List<T> 
handlers, Class<T> handlerType) {
         if (handlers != null) {

http://git-wip-us.apache.org/repos/asf/camel/blob/62151db1/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
 
b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
index 5856050..8d54f0a 100644
--- 
a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
+++ 
b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
@@ -42,6 +42,7 @@ import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelFactory;
 import org.jboss.netty.channel.ChannelFuture;
 import org.jboss.netty.channel.ChannelFutureListener;
+import org.jboss.netty.channel.SucceededChannelFuture;
 import org.jboss.netty.channel.group.ChannelGroup;
 import org.jboss.netty.channel.group.ChannelGroupFuture;
 import org.jboss.netty.channel.group.DefaultChannelGroup;
@@ -234,8 +235,14 @@ public class NettyProducer extends DefaultAsyncProducer {
         // setup state as attachment on the channel, so we can access the 
state later when needed
         channel.setAttachment(new NettyCamelState(producerCallback, exchange));
 
+        InetSocketAddress remoteAddress = null;
+        if (!isTcp() && configuration.isUdpConnectionlessSending()) {
+            // Need to specify the remoteAddress here
+            remoteAddress = new InetSocketAddress(configuration.getHost(), 
configuration.getPort()); 
+        }
+        
         // write body
-        NettyHelper.writeBodyAsync(LOG, channel, null, body, exchange, new 
ChannelFutureListener() {
+        NettyHelper.writeBodyAsync(LOG, channel, remoteAddress, body, 
exchange, new ChannelFutureListener() {
             public void operationComplete(ChannelFuture channelFuture) throws 
Exception {
                 LOG.trace("Operation complete {}", channelFuture);
                 if (!channelFuture.isSuccess()) {
@@ -398,9 +405,18 @@ public class NettyProducer extends DefaultAsyncProducer {
             connectionlessClientBootstrap.setPipelineFactory(pipelineFactory);
             // bind and store channel so we can close it when stopping
             Channel channel = connectionlessClientBootstrap.bind(new 
InetSocketAddress(0));
+            
             allChannels.add(channel);
-            answer = connectionlessClientBootstrap.connect(new 
InetSocketAddress(configuration.getHost(), configuration.getPort()));
-
+            // if udp connectionless sending is true we don't do a connect.
+            // we just send on the channel created with bind which means
+            // really fire and forget. You wont get an PortUnreachableException
+            // if no one is listen on the port
+            if (!configuration.isUdpConnectionlessSending()) {
+                answer = connectionlessClientBootstrap.connect(new 
InetSocketAddress(configuration.getHost(), configuration.getPort()));
+            } else {
+                answer = new SucceededChannelFuture(channel);
+            }
+            
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Created new UDP client bootstrap connecting to 
{}:{} with options: {}",
                        new Object[]{configuration.getHost(), 
configuration.getPort(), connectionlessClientBootstrap.getOptions()});
@@ -523,8 +539,14 @@ public class NettyProducer extends DefaultAsyncProducer {
 
         @Override
         public boolean validateObject(Channel channel) {
-            // we need a connected channel to be valid
-            boolean answer = channel.isConnected();
+            boolean answer = false;    
+            if (configuration.isUdpConnectionlessSending()) {
+                // we don't need check if the channel is connected
+                answer = channel.isOpen();
+            } else {
+                // we need a connected channel to be valid
+                answer = channel.isConnected();
+            }
             LOG.trace("Validating channel: {} -> {}", channel, answer);
             return answer;
         }

http://git-wip-us.apache.org/repos/asf/camel/blob/62151db1/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyComponentWithConfigurationTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyComponentWithConfigurationTest.java
 
b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyComponentWithConfigurationTest.java
index 3f598bc..59d4fd2 100644
--- 
a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyComponentWithConfigurationTest.java
+++ 
b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyComponentWithConfigurationTest.java
@@ -53,5 +53,32 @@ public class NettyComponentWithConfigurationTest extends 
CamelTestSupport {
         assertEquals(4455, e1.getConfiguration().getPort());
         assertEquals(5566, e2.getConfiguration().getPort());
     }
+    
+    @Test
+    public void testNettyComponentUdpWithConfiguration() throws Exception {
+        NettyComponent comp = context.getComponent("netty", 
NettyComponent.class);
+
+        NettyConfiguration cfg = new NettyConfiguration();
+
+        comp.setConfiguration(cfg);
+        assertSame(cfg, comp.getConfiguration());
+
+        NettyEndpoint e1 = (NettyEndpoint) 
comp.createEndpoint("netty://udp://localhost:8601?sync=false");
+        NettyEndpoint e2 = (NettyEndpoint) 
comp.createEndpoint("netty://udp://localhost:8602?sync=false&udpConnectionlessSending=true");
+
+        // should not be same
+        assertNotSame(e1, e2);
+        assertNotSame(e1.getConfiguration(), e2.getConfiguration());
+
+        // both endpoints are sync=false
+        assertEquals(false, e1.getConfiguration().isSync());
+        assertEquals(false, e2.getConfiguration().isSync());
+        // if not set it should be false
+        assertEquals(false, 
e1.getConfiguration().isUdpConnectionlessSending());
+        assertEquals(true, e2.getConfiguration().isUdpConnectionlessSending());
+
+        assertEquals(8601, e1.getConfiguration().getPort());
+        assertEquals(8602, e2.getConfiguration().getPort());
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/62151db1/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUdpConnectedSendTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUdpConnectedSendTest.java
 
b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUdpConnectedSendTest.java
new file mode 100644
index 0000000..aef3ae2
--- /dev/null
+++ 
b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUdpConnectedSendTest.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import java.net.InetSocketAddress;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
+import org.jboss.netty.handler.codec.string.StringDecoder;
+import org.jboss.netty.util.CharsetUtil;
+import org.junit.Test;
+
+
+public class NettyUdpConnectedSendTest extends BaseNettyTest {
+    private static final String SEND_STRING = "***<We all love camel>***";
+    private static final int SEND_COUNT = 20;
+    private int receivedCount;
+    private ConnectionlessBootstrap bootstrap;
+
+    public void createNettyUdpReceiver() {
+        bootstrap = new ConnectionlessBootstrap(new 
NioDatagramChannelFactory());
+        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
+            @Override
+            public ChannelPipeline getPipeline() throws Exception {
+                ChannelPipeline channelPipeline = Channels.pipeline();
+                channelPipeline.addLast("StringDecoder", new 
StringDecoder(CharsetUtil.UTF_8));
+                channelPipeline.addLast("ContentHandler", new 
ContentHandler());
+                return channelPipeline;
+            }
+        });
+
+    }
+
+
+    public void bind() {
+        bootstrap.bind(new InetSocketAddress(8601));
+    }
+
+    public void stop() {
+        bootstrap.shutdown();
+    }
+
+    @Test
+    public void sendConnectedUdp() throws Exception {
+        createNettyUdpReceiver();
+        Thread t = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                bind();
+            }
+        });
+        t.start();
+        Thread.sleep(1000);
+        for (int i = 0; i < SEND_COUNT; ++i) {
+            template.sendBody("direct:in", SEND_STRING);
+        }
+        Thread.sleep(1000);
+        stop();
+        assertTrue("We should have received some datagrams", receivedCount > 
0);
+    }
+
+    @Test
+    public void sendConnectedWithoutReceiver() throws Exception {
+        int exceptionCount = 0;
+        for (int i = 0; i < SEND_COUNT; ++i) {
+            try {
+                template.sendBody("direct:in", SEND_STRING);
+            } catch (Exception ex) {
+                ++exceptionCount;
+            }
+        }
+        assertTrue("There should at least one exception because port is 
unreachable", exceptionCount > 0);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                
from("direct:in").to("netty:udp://localhost:8601?sync=false&textline=true");
+            }
+        };
+    }
+
+    public class ContentHandler extends SimpleChannelUpstreamHandler {
+        @Override
+        public void messageReceived(ChannelHandlerContext ctx, MessageEvent 
messageEvent) throws Exception {
+            String s = (String)messageEvent.getMessage();
+            receivedCount++;
+            assertEquals(SEND_STRING, s.trim());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/62151db1/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUdpConnectionlessSendTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUdpConnectionlessSendTest.java
 
b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUdpConnectionlessSendTest.java
new file mode 100644
index 0000000..bd36e4a
--- /dev/null
+++ 
b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUdpConnectionlessSendTest.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import java.net.InetSocketAddress;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
+import org.jboss.netty.handler.codec.string.StringDecoder;
+import org.jboss.netty.util.CharsetUtil;
+import org.junit.Test;
+
+
+public class NettyUdpConnectionlessSendTest extends BaseNettyTest {
+    private static final String SEND_STRING = "***<We all love camel>***";
+    private static final int SEND_COUNT = 20;
+    private int receivedCount;
+    private ConnectionlessBootstrap bootstrap;
+
+    public void createNettyUdpReceiver() {
+        bootstrap = new ConnectionlessBootstrap(new 
NioDatagramChannelFactory());
+        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
+            @Override
+            public ChannelPipeline getPipeline() throws Exception {
+                ChannelPipeline channelPipeline = Channels.pipeline();
+                channelPipeline.addLast("StringDecoder", new 
StringDecoder(CharsetUtil.UTF_8));
+                channelPipeline.addLast("ContentHandler", new 
ContentHandler());
+                return channelPipeline;
+            }
+        });
+
+    }
+
+
+    public void bind() {
+        bootstrap.bind(new InetSocketAddress(8601));
+    }
+
+    public void stop() {
+        bootstrap.shutdown();
+    }
+
+    @Test
+    public void sendConnectionlessUdp() throws Exception {
+        createNettyUdpReceiver();
+        Thread t = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                bind();
+            }
+        });
+        t.start();
+        Thread.sleep(1000);
+        for (int i = 0; i < SEND_COUNT; ++i) {
+            template.sendBody("direct:in", SEND_STRING);
+        }
+        Thread.sleep(1000);
+        stop();
+        assertTrue("We should have received some datagrams", receivedCount > 
0);
+
+    }
+
+    @Test
+    public void sendWithoutReceiver() throws Exception {
+        int exceptionCount = 0;
+        for (int i = 0; i < SEND_COUNT; ++i) {
+            try {
+                template.sendBody("direct:in", SEND_STRING);
+            } catch (Exception ex) {
+                ++exceptionCount;
+            }
+        }
+        assertEquals("No exception should occur", 0, exceptionCount);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                
from("direct:in").to("netty:udp://localhost:8601?sync=false&textline=true&udpConnectionlessSending=true");
+            }
+        };
+    }
+
+    public class ContentHandler extends SimpleChannelUpstreamHandler {
+        @Override
+        public void messageReceived(ChannelHandlerContext ctx, MessageEvent 
messageEvent) throws Exception {
+            String s = (String)messageEvent.getMessage();
+            receivedCount++;
+            assertEquals(SEND_STRING, s.trim());
+        }
+    }
+}

Reply via email to