Repository: camel
Updated Branches:
  refs/heads/camel-2.17.x 4d6da3b7b -> 4475b798c


CAMEL-9859: Add Netty4 Channel Options back.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/eb2ea7bc
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/eb2ea7bc
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/eb2ea7bc

Branch: refs/heads/camel-2.17.x
Commit: eb2ea7bcff7382ff93cd10c9f6189b193bb0b8c2
Parents: 4d6da3b
Author: jpoth <jp...@redhat.com>
Authored: Tue Apr 12 11:36:15 2016 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Wed Apr 13 11:35:28 2016 +0200

----------------------------------------------------------------------
 .../SingleTCPNettyServerBootstrapFactory.java   | 24 +++++--
 .../SingleUDPNettyServerBootstrapFactory.java   | 37 +++++++----
 ...PMessageLargerThanDefaultBufferSizeTest.java | 70 ++++++++++++++++++++
 3 files changed, 113 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/eb2ea7bc/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleTCPNettyServerBootstrapFactory.java
----------------------------------------------------------------------
diff --git 
a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleTCPNettyServerBootstrapFactory.java
 
b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleTCPNettyServerBootstrapFactory.java
index 49ddbc8..fb764fe 100644
--- 
a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleTCPNettyServerBootstrapFactory.java
+++ 
b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleTCPNettyServerBootstrapFactory.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.netty4;
 
 import java.net.InetSocketAddress;
+import java.util.Map;
 import java.util.concurrent.ThreadFactory;
 
 import io.netty.bootstrap.ServerBootstrap;
@@ -33,6 +34,8 @@ import io.netty.util.concurrent.ImmediateEventExecutor;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Suspendable;
 import org.apache.camel.support.ServiceSupport;
+import org.apache.camel.util.CamelContextHelper;
+import org.apache.camel.util.EndpointHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -163,12 +166,23 @@ public class SingleTCPNettyServerBootstrapFactory extends 
ServiceSupport impleme
             serverBootstrap.option(ChannelOption.SO_BACKLOG, 
configuration.getBacklog());
         }
 
-        // TODO set any additional netty options and child options
-        /*if (configuration.getOptions() != null) {
-            for (Map.Entry<String, Object> entry : 
configuration.getOptions().entrySet()) {
-                serverBootstrap.setOption(entry.getKey(), entry.getValue());
+        Map<String, Object> options = configuration.getOptions();
+        if (options != null) {
+            for (Map.Entry<String, Object> entry : options.entrySet()) {
+                Object value = entry.getValue();
+                ChannelOption<Object> option = 
ChannelOption.valueOf(entry.getKey());
+                //For all netty options that aren't of type String
+                //TODO: find a way to add primitive Netty options without 
having to add them to the Camel registry.
+                if (EndpointHelper.isReferenceParameter(value.toString())) {
+                    String name =  ((String)value).substring(1);
+                    Object o = 
CamelContextHelper.mandatoryLookup(camelContext, name);;
+                    serverBootstrap.option(option, o);
+                } else {
+                    
serverBootstrap.option(ChannelOption.valueOf(entry.getKey()), value);
+                    serverBootstrap.option(option, value);
+                }
             }
-        }*/
+        }
 
         // set the pipeline factory, which creates the pipeline for each newly 
created channels
         serverBootstrap.childHandler(pipelineFactory);

http://git-wip-us.apache.org/repos/asf/camel/blob/eb2ea7bc/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleUDPNettyServerBootstrapFactory.java
----------------------------------------------------------------------
diff --git 
a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleUDPNettyServerBootstrapFactory.java
 
b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleUDPNettyServerBootstrapFactory.java
index 5d9d5bd..4f862f2 100644
--- 
a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleUDPNettyServerBootstrapFactory.java
+++ 
b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleUDPNettyServerBootstrapFactory.java
@@ -18,6 +18,7 @@ package org.apache.camel.component.netty4;
 
 import java.net.InetSocketAddress;
 import java.net.NetworkInterface;
+import java.util.Map;
 import java.util.concurrent.ThreadFactory;
 
 import io.netty.bootstrap.Bootstrap;
@@ -26,6 +27,7 @@ import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
+import io.netty.channel.FixedRecvByteBufAllocator;
 import io.netty.channel.group.ChannelGroup;
 import io.netty.channel.group.DefaultChannelGroup;
 import io.netty.channel.socket.DatagramChannel;
@@ -35,6 +37,8 @@ import org.apache.camel.CamelContext;
 import org.apache.camel.Suspendable;
 import org.apache.camel.component.netty4.util.SubnetUtils;
 import org.apache.camel.support.ServiceSupport;
+import org.apache.camel.util.CamelContextHelper;
+import org.apache.camel.util.EndpointHelper;
 import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -132,27 +136,34 @@ public class SingleUDPNettyServerBootstrapFactory extends 
ServiceSupport impleme
         bootstrap.option(ChannelOption.SO_BROADCAST, 
configuration.isBroadcast());
         bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 
configuration.getConnectTimeout());
         
-        // TODO need to find the right setting of below option
         // only set this if user has specified
-        /*
         if (configuration.getReceiveBufferSizePredictor() > 0) {
-            bootstrap.setOption("receiveBufferSizePredictorFactory",
-                    new 
FixedReceiveBufferSizePredictorFactory(configuration.getReceiveBufferSizePredictor()));
-        }*/
+            bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR,
+                    new 
FixedRecvByteBufAllocator(configuration.getReceiveBufferSizePredictor()));
+        }
         
         if (configuration.getBacklog() > 0) {
             bootstrap.option(ChannelOption.SO_BACKLOG, 
configuration.getBacklog());
         }
 
-        //TODO need to check the additional netty options
-        /*
-        if (configuration.getOptions() != null) {
-            for (Map.Entry<String, Object> entry : 
configuration.getOptions().entrySet()) {
-                connectionlessBootstrap.setOption(entry.getKey(), 
entry.getValue());
+        Map<String, Object> options = configuration.getOptions();
+        if (options != null) {
+            for (Map.Entry<String, Object> entry : options.entrySet()) {
+                Object value = entry.getValue();
+                ChannelOption<Object> option = 
ChannelOption.valueOf(entry.getKey());
+                //For all netty options that aren't of type String
+                //TODO: find a way to add primitive Netty options without 
having to add them to the Camel registry.
+                if (EndpointHelper.isReferenceParameter(value.toString())) {
+                    String name =  ((String)value).substring(1);
+                    Object o = 
CamelContextHelper.mandatoryLookup(camelContext, name);;
+                    bootstrap.option(option, o);
+                } else {
+                    bootstrap.option(ChannelOption.valueOf(entry.getKey()), 
value);
+                    bootstrap.option(option, value);
+                }
             }
-        }*/
-
-        LOG.debug("Created ConnectionlessBootstrap {}", bootstrap);
+        }
+        LOG.debug("Created Bootstrap {}", bootstrap);
 
         // set the pipeline factory, which creates the pipeline for each newly 
created channels
         bootstrap.handler(pipelineFactory);

http://git-wip-us.apache.org/repos/asf/camel/blob/eb2ea7bc/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyUDPMessageLargerThanDefaultBufferSizeTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyUDPMessageLargerThanDefaultBufferSizeTest.java
 
b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyUDPMessageLargerThanDefaultBufferSizeTest.java
new file mode 100644
index 0000000..77a681b
--- /dev/null
+++ 
b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyUDPMessageLargerThanDefaultBufferSizeTest.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4;
+
+import io.netty.channel.ChannelOption;
+import io.netty.channel.FixedRecvByteBufAllocator;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.JndiRegistry;
+import org.junit.Test;
+
+public class NettyUDPMessageLargerThanDefaultBufferSizeTest extends 
BaseNettyTest {
+
+    private byte[] getMessageBytes(int messageSize) {
+        byte[] msgBytes = new byte[messageSize];
+        for (int i = 0; i < messageSize; i++) {
+            msgBytes[i] = 'A';
+        }
+        return msgBytes;
+    }
+
+    private void sendMessage(int messageSize) throws Exception {
+        byte[] msgBytes = getMessageBytes(messageSize);
+
+        assertEquals(msgBytes.length, messageSize);
+        String message = new String(msgBytes);
+
+        getMockEndpoint("mock:result").expectedBodiesReceived(message);
+        template.sendBody("netty4:udp://localhost:{{port}}?sync=false", 
message);
+        assertMockEndpointsSatisfied();
+    }
+
+    @Test
+    public void testSend2048Message() throws Exception {
+        //Will fail unless the buffer was increased correctly
+        sendMessage(2048);
+    }
+
+    @Override
+    protected JndiRegistry createRegistry() throws Exception {
+        JndiRegistry jndi = super.createRegistry();
+        FixedRecvByteBufAllocator fixedRecvByteBufAllocator = new 
FixedRecvByteBufAllocator(4096);
+        jndi.bind(ChannelOption.RCVBUF_ALLOCATOR.name(), 
fixedRecvByteBufAllocator);
+        return jndi;
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("netty4:udp://localhost:{{port}}?option."+ 
ChannelOption.RCVBUF_ALLOCATOR.name() 
+"=#"+ChannelOption.RCVBUF_ALLOCATOR.name())
+                    .to("mock:result");
+            }
+        };
+    }
+}

Reply via email to