Repository: camel
Updated Branches:
  refs/heads/camel-2.17.x e500ce34a -> a2eb47e1e


CAMEL-10244: Make connection establishment fuly async


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/7acb3750
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/7acb3750
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/7acb3750

Branch: refs/heads/camel-2.17.x
Commit: 7acb3750bd82bec64e255d39780f393ffac6f824
Parents: e500ce3
Author: Vitalii Tymchyshyn <v...@tym.im>
Authored: Sat Aug 13 13:25:19 2016 -0400
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Mon Aug 15 17:38:58 2016 +0200

----------------------------------------------------------------------
 .../camel/component/netty4/NettyProducer.java   | 209 +++++++++++--------
 .../component/netty4/NettyTCPChainedTest.java   |  89 ++++++++
 2 files changed, 216 insertions(+), 82 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/7acb3750/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java
 
b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java
index a1b8d7f..4008422 100644
--- 
a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java
+++ 
b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java
@@ -60,7 +60,7 @@ public class NettyProducer extends DefaultAsyncProducer {
     private ClientInitializerFactory pipelineFactory;
     private CamelLogger noReplyLogger;
     private EventLoopGroup workerGroup;
-    private ObjectPool<Channel> pool;
+    private ObjectPool<ChannelFuture> pool;
     private Map<Channel, NettyCamelState> nettyCamelStatesMap = new 
ConcurrentHashMap<Channel, NettyCamelState>();
 
     public NettyProducer(NettyEndpoint nettyEndpoint, NettyConfiguration 
configuration) {
@@ -111,14 +111,14 @@ public class NettyProducer extends DefaultAsyncProducer {
             config.timeBetweenEvictionRunsMillis = 30 * 1000L;
             config.minEvictableIdleTimeMillis = 
configuration.getProducerPoolMinEvictableIdle();
             config.whenExhaustedAction = GenericObjectPool.WHEN_EXHAUSTED_FAIL;
-            pool = new GenericObjectPool<Channel>(new 
NettyProducerPoolableObjectFactory(), config);
+            pool = new GenericObjectPool<ChannelFuture>(new 
NettyProducerPoolableObjectFactory(), config);
 
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Created NettyProducer pool[maxActive={}, 
minIdle={}, maxIdle={}, minEvictableIdleTimeMillis={}] -> {}",
                         new Object[]{config.maxActive, config.minIdle, 
config.maxIdle, config.minEvictableIdleTimeMillis, pool});
             }
         } else {
-            pool = new SharedSingletonObjectPool<Channel>(new 
NettyProducerPoolableObjectFactory());
+            pool = new SharedSingletonObjectPool<ChannelFuture>(new 
NettyProducerPoolableObjectFactory());
             if (LOG.isDebugEnabled()) {
                 LOG.info("Created NettyProducer shared singleton pool -> {}", 
pool);
             }
@@ -134,8 +134,9 @@ public class NettyProducer extends DefaultAsyncProducer {
 
         if (!configuration.isLazyChannelCreation()) {
             // ensure the connection can be established when we start up
-            Channel channel = pool.borrowObject();
-            pool.returnObject(channel);
+            ChannelFuture channelFuture = pool.borrowObject();
+            channelFuture.get();
+            pool.returnObject(channelFuture);
         }
     }
 
@@ -197,16 +198,19 @@ public class NettyProducer extends DefaultAsyncProducer {
         }
 
         // get a channel from the pool
-        Channel existing = null;
+        ChannelFuture channelFuture = null;
+        Channel channel = null;
         try {
             if (getConfiguration().isReuseChannel()) {
-                existing = exchange.getProperty(NettyConstants.NETTY_CHANNEL, 
Channel.class);
+                channel = exchange.getProperty(NettyConstants.NETTY_CHANNEL, 
Channel.class);
             }
-            if (existing == null) {
-                existing = pool.borrowObject();
-                if (existing != null) {
-                    LOG.trace("Got channel from pool {}", existing);
+            if (channel == null) {
+                channelFuture = pool.borrowObject();
+                if (channelFuture != null) {
+                    LOG.trace("Got channel request from pool {}", 
channelFuture);
                 }
+            } else {
+                channelFuture = channel.newSucceededFuture();
             }
         } catch (Exception e) {
             exchange.setException(e);
@@ -215,16 +219,22 @@ public class NettyProducer extends DefaultAsyncProducer {
         }
 
         // we must have a channel
-        if (existing == null) {
+        if (channelFuture == null) {
             exchange.setException(new CamelExchangeException("Cannot get 
channel from pool", exchange));
             callback.done(true);
             return true;
         }
 
+        channelFuture.addListener(new ChannelConnectedListener(exchange, 
callback, body));
+        return false;
+    }
+
+    public void processWithConnectedChannel(final Exchange exchange, 
AsyncCallback callback, ChannelFuture channelFuture, 
+            Object body) {
         // remember channel so we can reuse it
+        final Channel channel = channelFuture.channel();
         if (getConfiguration().isReuseChannel() && 
exchange.getProperty(NettyConstants.NETTY_CHANNEL) == null) {
-            final Channel channel = existing;
-            exchange.setProperty(NettyConstants.NETTY_CHANNEL, existing);
+            exchange.setProperty(NettyConstants.NETTY_CHANNEL, channel);
             // and defer closing the channel until we are done routing the 
exchange
             exchange.addOnCompletion(new SynchronizationAdapter() {
                 @Override
@@ -248,36 +258,22 @@ public class NettyProducer extends DefaultAsyncProducer {
                         NettyHelper.close(channel);
                     }
 
-                    try {
-                        // Only put the connected channel back to the pool
-                        if (channel.isActive()) {
-                            LOG.trace("Putting channel back to pool {}", 
channel);
-                            pool.returnObject(channel);
-                        } else {
-                            // and if its not active then invalidate it
-                            LOG.trace("Invalidating channel from pool {}", 
channel);
-                            pool.invalidateObject(channel);
-                        }
-                    } catch (Exception e) {
-                        LOG.warn("Error returning channel to pool " + channel 
+ ". This exception will be ignored.", e);
-                    }
+                    releaseChannel(channelFuture);
                 }
             });
         }
 
         if (exchange.getIn().getHeader(NettyConstants.NETTY_REQUEST_TIMEOUT) 
!= null) {
             long timeoutInMs = 
exchange.getIn().getHeader(NettyConstants.NETTY_REQUEST_TIMEOUT, Long.class);
-            ChannelHandler oldHandler = existing.pipeline().get("timeout");
+            ChannelHandler oldHandler = channel.pipeline().get("timeout");
             ReadTimeoutHandler newHandler = new 
ReadTimeoutHandler(timeoutInMs, TimeUnit.MILLISECONDS);
             if (oldHandler == null) {
-                existing.pipeline().addBefore("handler", "timeout", 
newHandler);
+                channel.pipeline().addBefore("handler", "timeout", newHandler);
             } else {
-                existing.pipeline().replace(oldHandler, "timeout", newHandler);
+                channel.pipeline().replace(oldHandler, "timeout", newHandler);
             }
         }
         
-        // need to declare as final
-        final Channel channel = existing;
         final AsyncCallback producerCallback;
 
         if (configuration.isReuseChannel()) {
@@ -285,7 +281,7 @@ public class NettyProducer extends DefaultAsyncProducer {
             // as when reuse channel is enabled it will put the channel back 
in the pool when exchange is done using on completion
             producerCallback = callback;
         } else {
-            producerCallback = new NettyProducerCallback(channel, callback);
+            producerCallback = new NettyProducerCallback(channelFuture, 
callback);
         }
 
         // setup state as attachment on the channel, so we can access the 
state later when needed
@@ -337,8 +333,6 @@ public class NettyProducer extends DefaultAsyncProducer {
             }
         });
 
-        // continue routing asynchronously
-        return false;
     }
 
     /**
@@ -466,30 +460,38 @@ public class NettyProducer extends DefaultAsyncProducer {
         }
     }
 
-    protected Channel openChannel(ChannelFuture channelFuture) throws 
Exception {
+    protected void notifyChannelOpen(ChannelFuture channelFuture) throws 
Exception {
         // blocking for channel to be done
         if (LOG.isTraceEnabled()) {
-            LOG.trace("Waiting for operation to complete {} for {} millis", 
channelFuture, configuration.getConnectTimeout());
+            LOG.trace("Channel open finished with {}", channelFuture);
         }
 
-        // wait for the channel to be open (see io.netty.channel.ChannelFuture 
javadoc for example/recommendation)
-        channelFuture.awaitUninterruptibly();
+        if (channelFuture.isSuccess()) {
+            Channel answer = channelFuture.channel();
+            // to keep track of all channels in use
+            allChannels.add(answer);
 
-        if (!channelFuture.isDone() || !channelFuture.isSuccess()) {
-            ConnectException cause = new ConnectException("Cannot connect to " 
+ configuration.getAddress());
-            if (channelFuture.cause() != null) {
-                cause.initCause(channelFuture.cause());
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Creating connector to address: {}", 
configuration.getAddress());
             }
-            throw cause;
         }
-        Channel answer = channelFuture.channel();
-        // to keep track of all channels in use
-        allChannels.add(answer);
+    }
 
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Creating connector to address: {}", 
configuration.getAddress());
+    protected void releaseChannel(ChannelFuture channelFuture) {
+        Channel channel = channelFuture.channel();
+        try {
+            // Only put the connected channel back to the pool
+            if (channel.isActive()) {
+                LOG.trace("Putting channel back to pool {}", channel);
+                pool.returnObject(channelFuture);
+            } else {
+                // and if its not active then invalidate it
+                LOG.trace("Invalidating channel from pool {}", channel);
+                pool.invalidateObject(channelFuture);
+            }
+        } catch (Exception e) {
+            LOG.warn("Error returning channel to pool " + channel + ". This 
exception will be ignored.", e);
         }
-        return answer;
     }
 
     public NettyConfiguration getConfiguration() {
@@ -510,11 +512,11 @@ public class NettyProducer extends DefaultAsyncProducer {
      */
     private final class NettyProducerCallback implements AsyncCallback {
 
-        private final Channel channel;
+        private final ChannelFuture channelFuture;
         private final AsyncCallback callback;
 
-        private NettyProducerCallback(Channel channel, AsyncCallback callback) 
{
-            this.channel = channel;
+        private NettyProducerCallback(ChannelFuture channelFuture, 
AsyncCallback callback) {
+            this.channelFuture = channelFuture;
             this.callback = callback;
         }
 
@@ -522,17 +524,7 @@ public class NettyProducer extends DefaultAsyncProducer {
         public void done(boolean doneSync) {
             // put back in pool
             try {
-                // Only put the connected channel back to the pool
-                if (channel.isActive()) {
-                    LOG.trace("Putting channel back to pool {}", channel);
-                    pool.returnObject(channel);
-                } else {
-                    // and if its not active then invalidate it
-                    LOG.trace("Invalidating channel from pool {}", channel);
-                    pool.invalidateObject(channel);
-                }
-            } catch (Exception e) {
-                LOG.warn("Error returning channel to pool " + channel + ". 
This exception will be ignored.", e);
+                releaseChannel(channelFuture);
             } finally {
                 // ensure we call the delegated callback
                 callback.done(doneSync);
@@ -543,44 +535,97 @@ public class NettyProducer extends DefaultAsyncProducer {
     /**
      * Object factory to create {@link Channel} used by the pool.
      */
-    private final class NettyProducerPoolableObjectFactory implements 
PoolableObjectFactory<Channel> {
+    private final class NettyProducerPoolableObjectFactory implements 
PoolableObjectFactory<ChannelFuture> {
 
         @Override
-        public Channel makeObject() throws Exception {
-            ChannelFuture channelFuture = openConnection();
-            Channel answer = openChannel(channelFuture);
-            LOG.trace("Created channel: {}", answer);
-            return answer;
+        public ChannelFuture makeObject() throws Exception {
+            ChannelFuture channelFuture = openConnection().addListener(new 
ChannelFutureListener() {
+                @Override
+                public void operationComplete(ChannelFuture future) throws 
Exception {
+                    notifyChannelOpen(future);
+                }
+            });
+            LOG.trace("Requested channel: {}", channelFuture);
+            return channelFuture;
         }
 
         @Override
-        public void destroyObject(Channel channel) throws Exception {
-            LOG.trace("Destroying channel: {}", channel);
-            if (channel.isOpen()) {
-                NettyHelper.close(channel);
-            }
-            allChannels.remove(channel);
+        public void destroyObject(ChannelFuture channelFuture) throws 
Exception {
+            LOG.trace("Destroying channel request: {}", channelFuture);
+            channelFuture.addListener(new ChannelFutureListener() {
+                @Override
+                public void operationComplete(ChannelFuture future) throws 
Exception {
+                    Channel channel = future.channel();
+                    if (channel.isOpen()) {
+                        NettyHelper.close(channel);
+                    }
+                    allChannels.remove(channel);
+                }
+            });
+            channelFuture.cancel(false);
         }
 
         @Override
-        public boolean validateObject(Channel channel) {
-            // we need a connected channel to be valid
+        public boolean validateObject(ChannelFuture channelFuture) {
+            // we need a connecting or connected channel to be valid
+            if (!channelFuture.isDone()) {
+                LOG.trace("Validating connecting channel request: {} -> {}", 
channelFuture, true);
+                return true;
+            }
+            if (!channelFuture.isSuccess()) {
+                LOG.trace("Validating unsuccessful channel request: {} -> {}", 
channelFuture, false);
+                return false;
+            }
+            Channel channel = channelFuture.channel();
             boolean answer = channel.isActive();
             LOG.trace("Validating channel: {} -> {}", channel, answer);
             return answer;
         }
 
         @Override
-        public void activateObject(Channel channel) throws Exception {
+        public void activateObject(ChannelFuture channelFuture) throws 
Exception {
             // noop
-            LOG.trace("activateObject channel: {} -> {}", channel);
+            LOG.trace("activateObject channel request: {} -> {}", 
channelFuture);
         }
 
         @Override
-        public void passivateObject(Channel channel) throws Exception {
+        public void passivateObject(ChannelFuture channelFuture) throws 
Exception {
             // noop
-            LOG.trace("passivateObject channel: {} -> {}", channel);
+            LOG.trace("passivateObject channel request: {} -> {}", 
channelFuture);
         }
     }
 
+    /**
+     * Listener waiting for connection finished while processing exchange
+     */
+    private class ChannelConnectedListener implements ChannelFutureListener {
+        private final Exchange exchange;
+        private final AsyncCallback callback;
+        private final Object body;
+
+        public ChannelConnectedListener(Exchange exchange, AsyncCallback 
callback, Object body) {
+            this.exchange = exchange;
+            this.callback = callback;
+            this.body = body;
+        }
+
+        @Override
+        public void operationComplete(ChannelFuture future) {
+            if (!future.isDone() || !future.isSuccess()) {
+                ConnectException cause = new ConnectException("Cannot connect 
to " + configuration.getAddress());
+                if (future.cause() != null) {
+                    cause.initCause(future.cause());
+                }
+                exchange.setException(cause);
+                callback.done(false);
+
+            }
+            try {
+                processWithConnectedChannel(exchange, callback, future, body);
+            } catch (Throwable e) {
+                exchange.setException(e);
+                callback.done(false);
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/7acb3750/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyTCPChainedTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyTCPChainedTest.java
 
b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyTCPChainedTest.java
new file mode 100644
index 0000000..4c4904b
--- /dev/null
+++ 
b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyTCPChainedTest.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.converter.IOConverter;
+import org.apache.camel.util.IOHelper;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.FileInputStream;
+import java.io.InputStream;
+
+/**
+ * In this test we are checking that same netty endpoint can be safely called 
twice
+ * in single route with reconnect. It requires for processing to be fully 
async otherwise
+ * {@link io.netty.util.concurrent.BlockingOperationException} is thrown by 
netty.
+ */
+public class NettyTCPChainedTest extends BaseNettyTest {
+    @EndpointInject(uri = "mock:result")
+    protected MockEndpoint resultEndpoint;
+
+    private void sendFile(String uri) throws Exception {
+        Exchange exchange = template.asyncSend(uri, new Processor() {
+            public void process(Exchange exchange) throws Exception {
+                // Read from an input stream
+                InputStream is = IOHelper.buffered(new 
FileInputStream("src/test/resources/test.txt"));
+
+                byte buffer[] = IOConverter.toBytes(is);
+                is.close();
+
+                // Set the property of the charset encoding
+                exchange.setProperty(Exchange.CHARSET_NAME, "UTF-8");
+                Message in = exchange.getIn();
+                in.setBody(buffer);
+            }
+        }).get();
+        if (exchange.getException() != null) {
+            throw new AssertionError(exchange.getException());
+        }
+        Assert.assertFalse(exchange.isFailed());
+    }
+
+    @Test
+    public void testTCPChainedConnectionFromCallbackThread() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(2);
+        sendFile("direct:chainedCalls");
+        
+        mock.assertIsSatisfied();
+    }
+    
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("netty4:tcp://localhost:{{port}}?sync=false")
+                    .to("log:result")
+                    .to("mock:result");
+                from("direct:nettyCall")
+                        
.to("netty4:tcp://localhost:{{port}}?sync=false&disconnect=true&workerCount=1");
+                from("direct:chainedCalls")
+                        .to("direct:nettyCall")
+                        .to("direct:nettyCall");
+            }
+        };
+    }
+
+}

Reply via email to