CAMEL-1077 Added remote address support to camel-netty consumer with thanks to 
yuruki


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/057e95ff
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/057e95ff
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/057e95ff

Branch: refs/heads/master
Commit: 057e95ffa9da5b3a86dc0df00322c32980b3a7d4
Parents: 0e50f2c
Author: Willem Jiang <willem.ji...@gmail.com>
Authored: Wed Feb 11 17:47:13 2015 +0800
Committer: Willem Jiang <willem.ji...@gmail.com>
Committed: Wed Feb 11 17:48:11 2015 +0800

----------------------------------------------------------------------
 ...lientModeTCPNettyServerBootstrapFactory.java | 250 +++++++++++++++++++
 .../component/netty/NettyConfiguration.java     |  10 +
 .../camel/component/netty/NettyConsumer.java    |   6 +-
 .../netty/NettyConsumerClientModeTest.java      | 157 ++++++++++++
 4 files changed, 422 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/057e95ff/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientModeTCPNettyServerBootstrapFactory.java
----------------------------------------------------------------------
diff --git 
a/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientModeTCPNettyServerBootstrapFactory.java
 
b/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientModeTCPNettyServerBootstrapFactory.java
new file mode 100644
index 0000000..1b06127
--- /dev/null
+++ 
b/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientModeTCPNettyServerBootstrapFactory.java
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import java.net.ConnectException;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelException;
+import org.apache.camel.support.ServiceSupport;
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.channel.group.ChannelGroupFuture;
+import org.jboss.netty.channel.group.DefaultChannelGroup;
+import org.jboss.netty.channel.socket.nio.BossPool;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.jboss.netty.channel.socket.nio.WorkerPool;
+import org.jboss.netty.util.HashedWheelTimer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link org.apache.camel.component.netty.NettyServerBootstrapFactory} 
which is used by a single consumer (not shared).
+ */
+public class ClientModeTCPNettyServerBootstrapFactory extends ServiceSupport 
implements NettyServerBootstrapFactory {
+
+    protected static final Logger LOG = 
LoggerFactory.getLogger(ClientModeTCPNettyServerBootstrapFactory.class);
+    private final ChannelGroup allChannels;
+    private CamelContext camelContext;
+    private ThreadFactory threadFactory;
+    private NettyServerBootstrapConfiguration configuration;
+    private ChannelPipelineFactory pipelineFactory;
+    private ChannelFactory channelFactory;
+    private ClientBootstrap serverBootstrap;
+    private Channel channel;
+    private BossPool bossPool;
+    private WorkerPool workerPool;
+
+    public ClientModeTCPNettyServerBootstrapFactory() {
+        this.allChannels = new 
DefaultChannelGroup(ClientModeTCPNettyServerBootstrapFactory.class.getName());
+    }
+
+    public void init(CamelContext camelContext, 
NettyServerBootstrapConfiguration configuration, ChannelPipelineFactory 
pipelineFactory) {
+        this.camelContext = camelContext;
+        this.configuration = configuration;
+        this.pipelineFactory = pipelineFactory;
+    }
+
+    public void init(ThreadFactory threadFactory, 
NettyServerBootstrapConfiguration configuration, ChannelPipelineFactory 
pipelineFactory) {
+        this.threadFactory = threadFactory;
+        this.configuration = configuration;
+        this.pipelineFactory = pipelineFactory;
+    }
+
+    public void addChannel(Channel channel) {
+        allChannels.add(channel);
+    }
+
+    public void removeChannel(Channel channel) {
+        allChannels.remove(channel);
+    }
+
+    public void addConsumer(NettyConsumer consumer) {
+        // does not allow sharing
+    }
+
+    public void removeConsumer(NettyConsumer consumer) {
+        // does not allow sharing
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        if (camelContext == null && threadFactory == null) {
+            throw new IllegalArgumentException("Either CamelContext or 
ThreadFactory must be set on " + this);
+        }
+        startServerBootstrap();
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        stopServerBootstrap();
+    }
+
+    @Override
+    protected void doResume() throws Exception {
+        if (channel != null) {
+            LOG.debug("ServerBootstrap connecting to {}:{}", 
configuration.getHost(), configuration.getPort());
+            ChannelFuture future = channel.connect(new 
InetSocketAddress(configuration.getHost(), configuration.getPort()));
+            future.awaitUninterruptibly();
+            if (!future.isSuccess()) {
+                // if we cannot connect, then re-create channel
+                allChannels.remove(channel);
+                ChannelFuture connectFuture = serverBootstrap.connect(new 
InetSocketAddress(configuration.getHost(), configuration.getPort()));
+                channel = openChannel(connectFuture);
+                allChannels.add(channel);
+            }
+        }
+    }
+
+    @Override
+    protected void doSuspend() throws Exception {
+        if (channel != null) {
+            LOG.debug("ServerBootstrap disconnecting from {}:{}", 
configuration.getHost(), configuration.getPort());
+            ChannelFuture future = channel.disconnect();
+            future.awaitUninterruptibly();
+        }
+    }
+
+    protected void startServerBootstrap() {
+        // prefer using explicit configured thread pools
+        BossPool bp = configuration.getBossPool();
+        WorkerPool wp = configuration.getWorkerPool();
+
+        if (bp == null) {
+            // create new pool which we should shutdown when stopping as its 
not shared
+            bossPool = new NettyClientBossPoolBuilder()
+                    .withTimer(new HashedWheelTimer())
+                    .withBossCount(configuration.getBossCount())
+                    .withName("NettyClientTCPBoss")
+                    .build();
+            bp = bossPool;
+        }
+        if (wp == null) {
+            // create new pool which we should shutdown when stopping as its 
not shared
+            workerPool = new NettyWorkerPoolBuilder()
+                    .withWorkerCount(configuration.getWorkerCount())
+                    .withName("NettyServerTCPWorker")
+                    .build();
+            wp = workerPool;
+        }
+
+        channelFactory = new NioClientSocketChannelFactory(bp, wp);
+
+        serverBootstrap = new ClientBootstrap(channelFactory);
+        serverBootstrap.setOption("keepAlive", configuration.isKeepAlive());
+        serverBootstrap.setOption("tcpNoDelay", configuration.isTcpNoDelay());
+        serverBootstrap.setOption("reuseAddress", 
configuration.isReuseAddress());
+        serverBootstrap.setOption("connectTimeoutMillis", 
configuration.getConnectTimeout());
+        if (configuration.getBacklog() > 0) {
+            serverBootstrap.setOption("backlog", configuration.getBacklog());
+        }
+
+        // set any additional netty options
+        if (configuration.getOptions() != null) {
+            for (Map.Entry<String, Object> entry : 
configuration.getOptions().entrySet()) {
+                serverBootstrap.setOption(entry.getKey(), entry.getValue());
+            }
+        }
+
+        LOG.debug("Created ServerBootstrap {} with options: {}", 
serverBootstrap, serverBootstrap.getOptions());
+
+        // set the pipeline factory, which creates the pipeline for each newly 
created channels
+        serverBootstrap.setPipelineFactory(pipelineFactory);
+
+        LOG.info("ServerBootstrap connecting to {}:{}", 
configuration.getHost(), configuration.getPort());
+        ChannelFuture connectFuture = serverBootstrap.connect(new 
InetSocketAddress(configuration.getHost(), configuration.getPort()));
+        try {
+            channel = openChannel(connectFuture);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    protected Channel openChannel(ChannelFuture channelFuture) throws 
Exception {
+        // blocking for channel to be done
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Waiting for operation to complete {} for {} millis", 
channelFuture, configuration.getConnectTimeout());
+        }
+        // here we need to wait it in other thread
+        final CountDownLatch channelLatch = new CountDownLatch(1);
+        channelFuture.addListener(new ChannelFutureListener() {
+            @Override
+            public void operationComplete(ChannelFuture cf) throws Exception {
+                channelLatch.countDown();
+            }
+        });
+
+        try {
+            channelLatch.await(configuration.getConnectTimeout(), 
TimeUnit.MILLISECONDS);
+        } catch (InterruptedException ex) {
+            throw new CamelException("Interrupted while waiting for " + 
"connection to "
+                    + configuration.getAddress());
+        }
+
+        if (!channelFuture.isDone() || !channelFuture.isSuccess()) {
+            ConnectException cause = new ConnectException("Cannot connect to " 
+ configuration.getAddress());
+            if (channelFuture.getCause() != null) {
+                cause.initCause(channelFuture.getCause());
+            }
+            throw cause;
+        }
+        Channel answer = channelFuture.getChannel();
+        // to keep track of all channels in use
+        allChannels.add(answer);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Creating connector to address: {}", 
configuration.getAddress());
+        }
+        return answer;
+    }
+
+    protected void stopServerBootstrap() {
+        // close all channels
+        LOG.info("ServerBootstrap disconnecting from {}:{}", 
configuration.getHost(), configuration.getPort());
+
+        LOG.trace("Closing {} channels", allChannels.size());
+        ChannelGroupFuture future = allChannels.close();
+        future.awaitUninterruptibly();
+
+        // close server external resources
+        if (channelFactory != null) {
+            channelFactory.releaseExternalResources();
+            channelFactory = null;
+        }
+
+        // and then shutdown the thread pools
+        if (bossPool != null) {
+            bossPool.shutdown();
+            bossPool = null;
+        }
+        if (workerPool != null) {
+            workerPool.shutdown();
+            workerPool = null;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/057e95ff/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
----------------------------------------------------------------------
diff --git 
a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
 
b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
index a53e212..9f0973e 100644
--- 
a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
+++ 
b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
@@ -92,6 +92,8 @@ public class NettyConfiguration extends 
NettyServerBootstrapConfiguration implem
     private boolean producerPoolEnabled = true;
     @UriParam(defaultValue = "false")
     private boolean udpConnectionlessSending;
+    @UriParam(defaultValue = "false") 
+    private boolean clientMode;
 
     /**
      * Returns a copy of this configuration
@@ -455,6 +457,14 @@ public class NettyConfiguration extends 
NettyServerBootstrapConfiguration implem
     public void setUdpConnectionlessSending(boolean udpConnectionlessSending) {
         this.udpConnectionlessSending = udpConnectionlessSending;
     }
+    
+    public boolean isClientMode() {
+        return clientMode;
+    }
+
+    public void setClientMode(boolean clientMode) {
+        this.clientMode = clientMode;
+    }
 
     private static <T> void addToHandlersList(List<T> configured, List<T> 
handlers, Class<T> handlerType) {
         if (handlers != null) {

http://git-wip-us.apache.org/repos/asf/camel/blob/057e95ff/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java
 
b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java
index fa8b06a..84924da 100644
--- 
a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java
+++ 
b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java
@@ -59,7 +59,11 @@ public class NettyConsumer extends DefaultConsumer {
             }
 
             if (isTcp()) {
-                nettyServerBootstrapFactory = new 
SingleTCPNettyServerBootstrapFactory();
+                if (configuration.isClientMode()) {
+                    nettyServerBootstrapFactory = new 
ClientModeTCPNettyServerBootstrapFactory();
+                } else {
+                    nettyServerBootstrapFactory = new 
SingleTCPNettyServerBootstrapFactory();
+                }
             } else {
                 nettyServerBootstrapFactory = new 
SingleUDPNettyServerBootstrapFactory();
             }

http://git-wip-us.apache.org/repos/asf/camel/blob/057e95ff/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyConsumerClientModeTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyConsumerClientModeTest.java
 
b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyConsumerClientModeTest.java
new file mode 100644
index 0000000..0f71042
--- /dev/null
+++ 
b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyConsumerClientModeTest.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+
+import java.net.InetSocketAddress;
+import java.nio.charset.Charset;
+import java.util.concurrent.Executors;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.SimpleChannelHandler;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import org.jboss.netty.handler.codec.frame.Delimiters;
+import org.jboss.netty.util.CharsetUtil;
+import org.junit.Test;
+
+public class NettyConsumerClientModeTest extends BaseNettyTest {
+    private static final ChannelBuffer DATA = 
ChannelBuffers.copiedBuffer("Willem".getBytes(CharsetUtil.UTF_8));
+    private MyServer server;
+    
+   
+    public void startNettyServer() {
+        server = new MyServer(getPort());
+        server.start();
+    }
+   
+    public void shutdownServer() {
+        if (server != null) {
+            server.shutdown();
+        }
+    }
+    @Test
+    public void testNettyRoute() throws Exception {
+        try {
+            startNettyServer();
+            MockEndpoint receive = context.getEndpoint("mock:receive", 
MockEndpoint.class);
+            receive.expectedBodiesReceived("Bye Willem");
+            context.startRoute("client");
+            receive.assertIsSatisfied();
+        } finally {
+            shutdownServer();
+        }
+        
+    }
+      
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                
from("netty:tcp://localhost:{{port}}?textline=true&clientMode=true").id("client")
+                .process(new Processor() {
+                    public void process(final Exchange exchange) {
+                        String body = exchange.getIn().getBody(String.class);
+                        exchange.getOut().setBody("Bye " + body);
+                    }
+                }).to("mock:receive").noAutoStartup();
+            }
+        };
+    }
+    
+    private static class MyServer {
+        private int port;
+        private ServerBootstrap bootstrap;
+        public MyServer(int port) {
+            this.port = port;
+        }
+        public void start() {
+            // Configure the server.
+            bootstrap = new ServerBootstrap(
+                new NioServerSocketChannelFactory(
+                    Executors.newCachedThreadPool(), 
+                    Executors.newCachedThreadPool()));
+            // Set up the event pipeline factory.
+            bootstrap.setPipelineFactory(new ServerPipelineFactory());
+            // Bind and start to accept incoming connections.
+            bootstrap.bind(new InetSocketAddress(port));
+        }
+        
+        public void shutdown() {
+            bootstrap.shutdown();
+        }
+        
+    }
+    
+    private static class ServerHandler extends SimpleChannelHandler {
+
+        @Override
+        public void channelConnected(ChannelHandlerContext ctx, 
ChannelStateEvent e) {
+            Channel ch = e.getChannel();
+            ch.write(DATA);
+            ChannelFuture f = ch.write(Delimiters.lineDelimiter()[0]);
+            
+            f.addListener(new ChannelFutureListener() {
+                public void operationComplete(ChannelFuture future) {
+                    Channel ch = future.getChannel();
+                    ch.close();
+                }
+            });
+        }
+
+        @Override
+        public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent 
e) {
+            e.getCause().printStackTrace();
+            e.getChannel().close();
+        }
+    }
+    
+    private static class ServerPipelineFactory implements 
ChannelPipelineFactory {
+
+        public ChannelPipeline getPipeline() {
+            ChannelPipeline p = Channels.pipeline();
+            Charset charset = CharsetUtil.UTF_8;
+           
+            ChannelBuffer[] delimiters = Delimiters.nulDelimiter();
+           
+            // setup the textline encoding and decoding
+            p.addLast("decoder1", 
ChannelHandlerFactories.newDelimiterBasedFrameDecoder(1024 * 8, 
delimiters).newChannelHandler());
+            p.addLast("decoder2", 
ChannelHandlerFactories.newStringDecoder(charset).newChannelHandler());
+            
+            p.addLast("encoder", 
ChannelHandlerFactories.newStringEncoder(charset).newChannelHandler());
+            
+            p.addLast("handler", new ServerHandler());
+            return p;
+        }
+    }
+
+}

Reply via email to