CAMEL-1077 Added remote address support to camel-netty consumer with thanks to yuruki
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/057e95ff Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/057e95ff Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/057e95ff Branch: refs/heads/master Commit: 057e95ffa9da5b3a86dc0df00322c32980b3a7d4 Parents: 0e50f2c Author: Willem Jiang <willem.ji...@gmail.com> Authored: Wed Feb 11 17:47:13 2015 +0800 Committer: Willem Jiang <willem.ji...@gmail.com> Committed: Wed Feb 11 17:48:11 2015 +0800 ---------------------------------------------------------------------- ...lientModeTCPNettyServerBootstrapFactory.java | 250 +++++++++++++++++++ .../component/netty/NettyConfiguration.java | 10 + .../camel/component/netty/NettyConsumer.java | 6 +- .../netty/NettyConsumerClientModeTest.java | 157 ++++++++++++ 4 files changed, 422 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/057e95ff/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientModeTCPNettyServerBootstrapFactory.java ---------------------------------------------------------------------- diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientModeTCPNettyServerBootstrapFactory.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientModeTCPNettyServerBootstrapFactory.java new file mode 100644 index 0000000..1b06127 --- /dev/null +++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientModeTCPNettyServerBootstrapFactory.java @@ -0,0 +1,250 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.netty; + +import java.net.ConnectException; +import java.net.InetSocketAddress; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.CamelContext; +import org.apache.camel.CamelException; +import org.apache.camel.support.ServiceSupport; +import org.jboss.netty.bootstrap.ClientBootstrap; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelFactory; +import org.jboss.netty.channel.ChannelFuture; +import org.jboss.netty.channel.ChannelFutureListener; +import org.jboss.netty.channel.ChannelPipelineFactory; +import org.jboss.netty.channel.group.ChannelGroup; +import org.jboss.netty.channel.group.ChannelGroupFuture; +import org.jboss.netty.channel.group.DefaultChannelGroup; +import org.jboss.netty.channel.socket.nio.BossPool; +import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; +import org.jboss.netty.channel.socket.nio.WorkerPool; +import org.jboss.netty.util.HashedWheelTimer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link org.apache.camel.component.netty.NettyServerBootstrapFactory} which is used by a single consumer (not shared). + */ +public class ClientModeTCPNettyServerBootstrapFactory extends ServiceSupport implements NettyServerBootstrapFactory { + + protected static final Logger LOG = LoggerFactory.getLogger(ClientModeTCPNettyServerBootstrapFactory.class); + private final ChannelGroup allChannels; + private CamelContext camelContext; + private ThreadFactory threadFactory; + private NettyServerBootstrapConfiguration configuration; + private ChannelPipelineFactory pipelineFactory; + private ChannelFactory channelFactory; + private ClientBootstrap serverBootstrap; + private Channel channel; + private BossPool bossPool; + private WorkerPool workerPool; + + public ClientModeTCPNettyServerBootstrapFactory() { + this.allChannels = new DefaultChannelGroup(ClientModeTCPNettyServerBootstrapFactory.class.getName()); + } + + public void init(CamelContext camelContext, NettyServerBootstrapConfiguration configuration, ChannelPipelineFactory pipelineFactory) { + this.camelContext = camelContext; + this.configuration = configuration; + this.pipelineFactory = pipelineFactory; + } + + public void init(ThreadFactory threadFactory, NettyServerBootstrapConfiguration configuration, ChannelPipelineFactory pipelineFactory) { + this.threadFactory = threadFactory; + this.configuration = configuration; + this.pipelineFactory = pipelineFactory; + } + + public void addChannel(Channel channel) { + allChannels.add(channel); + } + + public void removeChannel(Channel channel) { + allChannels.remove(channel); + } + + public void addConsumer(NettyConsumer consumer) { + // does not allow sharing + } + + public void removeConsumer(NettyConsumer consumer) { + // does not allow sharing + } + + @Override + protected void doStart() throws Exception { + if (camelContext == null && threadFactory == null) { + throw new IllegalArgumentException("Either CamelContext or ThreadFactory must be set on " + this); + } + startServerBootstrap(); + } + + @Override + protected void doStop() throws Exception { + stopServerBootstrap(); + } + + @Override + protected void doResume() throws Exception { + if (channel != null) { + LOG.debug("ServerBootstrap connecting to {}:{}", configuration.getHost(), configuration.getPort()); + ChannelFuture future = channel.connect(new InetSocketAddress(configuration.getHost(), configuration.getPort())); + future.awaitUninterruptibly(); + if (!future.isSuccess()) { + // if we cannot connect, then re-create channel + allChannels.remove(channel); + ChannelFuture connectFuture = serverBootstrap.connect(new InetSocketAddress(configuration.getHost(), configuration.getPort())); + channel = openChannel(connectFuture); + allChannels.add(channel); + } + } + } + + @Override + protected void doSuspend() throws Exception { + if (channel != null) { + LOG.debug("ServerBootstrap disconnecting from {}:{}", configuration.getHost(), configuration.getPort()); + ChannelFuture future = channel.disconnect(); + future.awaitUninterruptibly(); + } + } + + protected void startServerBootstrap() { + // prefer using explicit configured thread pools + BossPool bp = configuration.getBossPool(); + WorkerPool wp = configuration.getWorkerPool(); + + if (bp == null) { + // create new pool which we should shutdown when stopping as its not shared + bossPool = new NettyClientBossPoolBuilder() + .withTimer(new HashedWheelTimer()) + .withBossCount(configuration.getBossCount()) + .withName("NettyClientTCPBoss") + .build(); + bp = bossPool; + } + if (wp == null) { + // create new pool which we should shutdown when stopping as its not shared + workerPool = new NettyWorkerPoolBuilder() + .withWorkerCount(configuration.getWorkerCount()) + .withName("NettyServerTCPWorker") + .build(); + wp = workerPool; + } + + channelFactory = new NioClientSocketChannelFactory(bp, wp); + + serverBootstrap = new ClientBootstrap(channelFactory); + serverBootstrap.setOption("keepAlive", configuration.isKeepAlive()); + serverBootstrap.setOption("tcpNoDelay", configuration.isTcpNoDelay()); + serverBootstrap.setOption("reuseAddress", configuration.isReuseAddress()); + serverBootstrap.setOption("connectTimeoutMillis", configuration.getConnectTimeout()); + if (configuration.getBacklog() > 0) { + serverBootstrap.setOption("backlog", configuration.getBacklog()); + } + + // set any additional netty options + if (configuration.getOptions() != null) { + for (Map.Entry<String, Object> entry : configuration.getOptions().entrySet()) { + serverBootstrap.setOption(entry.getKey(), entry.getValue()); + } + } + + LOG.debug("Created ServerBootstrap {} with options: {}", serverBootstrap, serverBootstrap.getOptions()); + + // set the pipeline factory, which creates the pipeline for each newly created channels + serverBootstrap.setPipelineFactory(pipelineFactory); + + LOG.info("ServerBootstrap connecting to {}:{}", configuration.getHost(), configuration.getPort()); + ChannelFuture connectFuture = serverBootstrap.connect(new InetSocketAddress(configuration.getHost(), configuration.getPort())); + try { + channel = openChannel(connectFuture); + } catch (Exception e) { + e.printStackTrace(); + } + } + + protected Channel openChannel(ChannelFuture channelFuture) throws Exception { + // blocking for channel to be done + if (LOG.isTraceEnabled()) { + LOG.trace("Waiting for operation to complete {} for {} millis", channelFuture, configuration.getConnectTimeout()); + } + // here we need to wait it in other thread + final CountDownLatch channelLatch = new CountDownLatch(1); + channelFuture.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture cf) throws Exception { + channelLatch.countDown(); + } + }); + + try { + channelLatch.await(configuration.getConnectTimeout(), TimeUnit.MILLISECONDS); + } catch (InterruptedException ex) { + throw new CamelException("Interrupted while waiting for " + "connection to " + + configuration.getAddress()); + } + + if (!channelFuture.isDone() || !channelFuture.isSuccess()) { + ConnectException cause = new ConnectException("Cannot connect to " + configuration.getAddress()); + if (channelFuture.getCause() != null) { + cause.initCause(channelFuture.getCause()); + } + throw cause; + } + Channel answer = channelFuture.getChannel(); + // to keep track of all channels in use + allChannels.add(answer); + + if (LOG.isDebugEnabled()) { + LOG.debug("Creating connector to address: {}", configuration.getAddress()); + } + return answer; + } + + protected void stopServerBootstrap() { + // close all channels + LOG.info("ServerBootstrap disconnecting from {}:{}", configuration.getHost(), configuration.getPort()); + + LOG.trace("Closing {} channels", allChannels.size()); + ChannelGroupFuture future = allChannels.close(); + future.awaitUninterruptibly(); + + // close server external resources + if (channelFactory != null) { + channelFactory.releaseExternalResources(); + channelFactory = null; + } + + // and then shutdown the thread pools + if (bossPool != null) { + bossPool.shutdown(); + bossPool = null; + } + if (workerPool != null) { + workerPool.shutdown(); + workerPool = null; + } + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/057e95ff/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java index a53e212..9f0973e 100644 --- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java +++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java @@ -92,6 +92,8 @@ public class NettyConfiguration extends NettyServerBootstrapConfiguration implem private boolean producerPoolEnabled = true; @UriParam(defaultValue = "false") private boolean udpConnectionlessSending; + @UriParam(defaultValue = "false") + private boolean clientMode; /** * Returns a copy of this configuration @@ -455,6 +457,14 @@ public class NettyConfiguration extends NettyServerBootstrapConfiguration implem public void setUdpConnectionlessSending(boolean udpConnectionlessSending) { this.udpConnectionlessSending = udpConnectionlessSending; } + + public boolean isClientMode() { + return clientMode; + } + + public void setClientMode(boolean clientMode) { + this.clientMode = clientMode; + } private static <T> void addToHandlersList(List<T> configured, List<T> handlers, Class<T> handlerType) { if (handlers != null) { http://git-wip-us.apache.org/repos/asf/camel/blob/057e95ff/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java index fa8b06a..84924da 100644 --- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java +++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java @@ -59,7 +59,11 @@ public class NettyConsumer extends DefaultConsumer { } if (isTcp()) { - nettyServerBootstrapFactory = new SingleTCPNettyServerBootstrapFactory(); + if (configuration.isClientMode()) { + nettyServerBootstrapFactory = new ClientModeTCPNettyServerBootstrapFactory(); + } else { + nettyServerBootstrapFactory = new SingleTCPNettyServerBootstrapFactory(); + } } else { nettyServerBootstrapFactory = new SingleUDPNettyServerBootstrapFactory(); } http://git-wip-us.apache.org/repos/asf/camel/blob/057e95ff/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyConsumerClientModeTest.java ---------------------------------------------------------------------- diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyConsumerClientModeTest.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyConsumerClientModeTest.java new file mode 100644 index 0000000..0f71042 --- /dev/null +++ b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyConsumerClientModeTest.java @@ -0,0 +1,157 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.netty; + + +import java.net.InetSocketAddress; +import java.nio.charset.Charset; +import java.util.concurrent.Executors; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.jboss.netty.bootstrap.ServerBootstrap; +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelFuture; +import org.jboss.netty.channel.ChannelFutureListener; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.channel.ChannelPipeline; +import org.jboss.netty.channel.ChannelPipelineFactory; +import org.jboss.netty.channel.ChannelStateEvent; +import org.jboss.netty.channel.Channels; +import org.jboss.netty.channel.ExceptionEvent; +import org.jboss.netty.channel.SimpleChannelHandler; +import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; +import org.jboss.netty.handler.codec.frame.Delimiters; +import org.jboss.netty.util.CharsetUtil; +import org.junit.Test; + +public class NettyConsumerClientModeTest extends BaseNettyTest { + private static final ChannelBuffer DATA = ChannelBuffers.copiedBuffer("Willem".getBytes(CharsetUtil.UTF_8)); + private MyServer server; + + + public void startNettyServer() { + server = new MyServer(getPort()); + server.start(); + } + + public void shutdownServer() { + if (server != null) { + server.shutdown(); + } + } + @Test + public void testNettyRoute() throws Exception { + try { + startNettyServer(); + MockEndpoint receive = context.getEndpoint("mock:receive", MockEndpoint.class); + receive.expectedBodiesReceived("Bye Willem"); + context.startRoute("client"); + receive.assertIsSatisfied(); + } finally { + shutdownServer(); + } + + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("netty:tcp://localhost:{{port}}?textline=true&clientMode=true").id("client") + .process(new Processor() { + public void process(final Exchange exchange) { + String body = exchange.getIn().getBody(String.class); + exchange.getOut().setBody("Bye " + body); + } + }).to("mock:receive").noAutoStartup(); + } + }; + } + + private static class MyServer { + private int port; + private ServerBootstrap bootstrap; + public MyServer(int port) { + this.port = port; + } + public void start() { + // Configure the server. + bootstrap = new ServerBootstrap( + new NioServerSocketChannelFactory( + Executors.newCachedThreadPool(), + Executors.newCachedThreadPool())); + // Set up the event pipeline factory. + bootstrap.setPipelineFactory(new ServerPipelineFactory()); + // Bind and start to accept incoming connections. + bootstrap.bind(new InetSocketAddress(port)); + } + + public void shutdown() { + bootstrap.shutdown(); + } + + } + + private static class ServerHandler extends SimpleChannelHandler { + + @Override + public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) { + Channel ch = e.getChannel(); + ch.write(DATA); + ChannelFuture f = ch.write(Delimiters.lineDelimiter()[0]); + + f.addListener(new ChannelFutureListener() { + public void operationComplete(ChannelFuture future) { + Channel ch = future.getChannel(); + ch.close(); + } + }); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { + e.getCause().printStackTrace(); + e.getChannel().close(); + } + } + + private static class ServerPipelineFactory implements ChannelPipelineFactory { + + public ChannelPipeline getPipeline() { + ChannelPipeline p = Channels.pipeline(); + Charset charset = CharsetUtil.UTF_8; + + ChannelBuffer[] delimiters = Delimiters.nulDelimiter(); + + // setup the textline encoding and decoding + p.addLast("decoder1", ChannelHandlerFactories.newDelimiterBasedFrameDecoder(1024 * 8, delimiters).newChannelHandler()); + p.addLast("decoder2", ChannelHandlerFactories.newStringDecoder(charset).newChannelHandler()); + + p.addLast("encoder", ChannelHandlerFactories.newStringEncoder(charset).newChannelHandler()); + + p.addLast("handler", new ServerHandler()); + return p; + } + } + +}