Repository: camel Updated Branches: refs/heads/master b3a819327 -> bfe3ed08f
CAMEL-9859: Add Netty4 Channel Options back. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/1641c0f4 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/1641c0f4 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/1641c0f4 Branch: refs/heads/master Commit: 1641c0f465a2b08d658d449feb8f235ca1abd492 Parents: b3a8193 Author: jpoth <jp...@redhat.com> Authored: Tue Apr 12 11:36:15 2016 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Wed Apr 13 11:25:52 2016 +0200 ---------------------------------------------------------------------- .../SingleTCPNettyServerBootstrapFactory.java | 24 +++++-- .../SingleUDPNettyServerBootstrapFactory.java | 37 +++++++---- ...PMessageLargerThanDefaultBufferSizeTest.java | 70 ++++++++++++++++++++ 3 files changed, 113 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/1641c0f4/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleTCPNettyServerBootstrapFactory.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleTCPNettyServerBootstrapFactory.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleTCPNettyServerBootstrapFactory.java index 49ddbc8..fb764fe 100644 --- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleTCPNettyServerBootstrapFactory.java +++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleTCPNettyServerBootstrapFactory.java @@ -17,6 +17,7 @@ package org.apache.camel.component.netty4; import java.net.InetSocketAddress; +import java.util.Map; import java.util.concurrent.ThreadFactory; import io.netty.bootstrap.ServerBootstrap; @@ -33,6 +34,8 @@ import io.netty.util.concurrent.ImmediateEventExecutor; import org.apache.camel.CamelContext; import org.apache.camel.Suspendable; import org.apache.camel.support.ServiceSupport; +import org.apache.camel.util.CamelContextHelper; +import org.apache.camel.util.EndpointHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -163,12 +166,23 @@ public class SingleTCPNettyServerBootstrapFactory extends ServiceSupport impleme serverBootstrap.option(ChannelOption.SO_BACKLOG, configuration.getBacklog()); } - // TODO set any additional netty options and child options - /*if (configuration.getOptions() != null) { - for (Map.Entry<String, Object> entry : configuration.getOptions().entrySet()) { - serverBootstrap.setOption(entry.getKey(), entry.getValue()); + Map<String, Object> options = configuration.getOptions(); + if (options != null) { + for (Map.Entry<String, Object> entry : options.entrySet()) { + Object value = entry.getValue(); + ChannelOption<Object> option = ChannelOption.valueOf(entry.getKey()); + //For all netty options that aren't of type String + //TODO: find a way to add primitive Netty options without having to add them to the Camel registry. + if (EndpointHelper.isReferenceParameter(value.toString())) { + String name = ((String)value).substring(1); + Object o = CamelContextHelper.mandatoryLookup(camelContext, name);; + serverBootstrap.option(option, o); + } else { + serverBootstrap.option(ChannelOption.valueOf(entry.getKey()), value); + serverBootstrap.option(option, value); + } } - }*/ + } // set the pipeline factory, which creates the pipeline for each newly created channels serverBootstrap.childHandler(pipelineFactory); http://git-wip-us.apache.org/repos/asf/camel/blob/1641c0f4/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleUDPNettyServerBootstrapFactory.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleUDPNettyServerBootstrapFactory.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleUDPNettyServerBootstrapFactory.java index 5d9d5bd..4f862f2 100644 --- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleUDPNettyServerBootstrapFactory.java +++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleUDPNettyServerBootstrapFactory.java @@ -18,6 +18,7 @@ package org.apache.camel.component.netty4; import java.net.InetSocketAddress; import java.net.NetworkInterface; +import java.util.Map; import java.util.concurrent.ThreadFactory; import io.netty.bootstrap.Bootstrap; @@ -26,6 +27,7 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; +import io.netty.channel.FixedRecvByteBufAllocator; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.channel.socket.DatagramChannel; @@ -35,6 +37,8 @@ import org.apache.camel.CamelContext; import org.apache.camel.Suspendable; import org.apache.camel.component.netty4.util.SubnetUtils; import org.apache.camel.support.ServiceSupport; +import org.apache.camel.util.CamelContextHelper; +import org.apache.camel.util.EndpointHelper; import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -132,27 +136,34 @@ public class SingleUDPNettyServerBootstrapFactory extends ServiceSupport impleme bootstrap.option(ChannelOption.SO_BROADCAST, configuration.isBroadcast()); bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, configuration.getConnectTimeout()); - // TODO need to find the right setting of below option // only set this if user has specified - /* if (configuration.getReceiveBufferSizePredictor() > 0) { - bootstrap.setOption("receiveBufferSizePredictorFactory", - new FixedReceiveBufferSizePredictorFactory(configuration.getReceiveBufferSizePredictor())); - }*/ + bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, + new FixedRecvByteBufAllocator(configuration.getReceiveBufferSizePredictor())); + } if (configuration.getBacklog() > 0) { bootstrap.option(ChannelOption.SO_BACKLOG, configuration.getBacklog()); } - //TODO need to check the additional netty options - /* - if (configuration.getOptions() != null) { - for (Map.Entry<String, Object> entry : configuration.getOptions().entrySet()) { - connectionlessBootstrap.setOption(entry.getKey(), entry.getValue()); + Map<String, Object> options = configuration.getOptions(); + if (options != null) { + for (Map.Entry<String, Object> entry : options.entrySet()) { + Object value = entry.getValue(); + ChannelOption<Object> option = ChannelOption.valueOf(entry.getKey()); + //For all netty options that aren't of type String + //TODO: find a way to add primitive Netty options without having to add them to the Camel registry. + if (EndpointHelper.isReferenceParameter(value.toString())) { + String name = ((String)value).substring(1); + Object o = CamelContextHelper.mandatoryLookup(camelContext, name);; + bootstrap.option(option, o); + } else { + bootstrap.option(ChannelOption.valueOf(entry.getKey()), value); + bootstrap.option(option, value); + } } - }*/ - - LOG.debug("Created ConnectionlessBootstrap {}", bootstrap); + } + LOG.debug("Created Bootstrap {}", bootstrap); // set the pipeline factory, which creates the pipeline for each newly created channels bootstrap.handler(pipelineFactory); http://git-wip-us.apache.org/repos/asf/camel/blob/1641c0f4/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyUDPMessageLargerThanDefaultBufferSizeTest.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyUDPMessageLargerThanDefaultBufferSizeTest.java b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyUDPMessageLargerThanDefaultBufferSizeTest.java new file mode 100644 index 0000000..77a681b --- /dev/null +++ b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyUDPMessageLargerThanDefaultBufferSizeTest.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.netty4; + +import io.netty.channel.ChannelOption; +import io.netty.channel.FixedRecvByteBufAllocator; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.impl.JndiRegistry; +import org.junit.Test; + +public class NettyUDPMessageLargerThanDefaultBufferSizeTest extends BaseNettyTest { + + private byte[] getMessageBytes(int messageSize) { + byte[] msgBytes = new byte[messageSize]; + for (int i = 0; i < messageSize; i++) { + msgBytes[i] = 'A'; + } + return msgBytes; + } + + private void sendMessage(int messageSize) throws Exception { + byte[] msgBytes = getMessageBytes(messageSize); + + assertEquals(msgBytes.length, messageSize); + String message = new String(msgBytes); + + getMockEndpoint("mock:result").expectedBodiesReceived(message); + template.sendBody("netty4:udp://localhost:{{port}}?sync=false", message); + assertMockEndpointsSatisfied(); + } + + @Test + public void testSend2048Message() throws Exception { + //Will fail unless the buffer was increased correctly + sendMessage(2048); + } + + @Override + protected JndiRegistry createRegistry() throws Exception { + JndiRegistry jndi = super.createRegistry(); + FixedRecvByteBufAllocator fixedRecvByteBufAllocator = new FixedRecvByteBufAllocator(4096); + jndi.bind(ChannelOption.RCVBUF_ALLOCATOR.name(), fixedRecvByteBufAllocator); + return jndi; + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("netty4:udp://localhost:{{port}}?option."+ ChannelOption.RCVBUF_ALLOCATOR.name() +"=#"+ChannelOption.RCVBUF_ALLOCATOR.name()) + .to("mock:result"); + } + }; + } +}