Repository: camel Updated Branches: refs/heads/camel-2.17.x e500ce34a -> a2eb47e1e
CAMEL-10244: Make connection establishment fuly async Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/7acb3750 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/7acb3750 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/7acb3750 Branch: refs/heads/camel-2.17.x Commit: 7acb3750bd82bec64e255d39780f393ffac6f824 Parents: e500ce3 Author: Vitalii Tymchyshyn <v...@tym.im> Authored: Sat Aug 13 13:25:19 2016 -0400 Committer: Claus Ibsen <davscl...@apache.org> Committed: Mon Aug 15 17:38:58 2016 +0200 ---------------------------------------------------------------------- .../camel/component/netty4/NettyProducer.java | 209 +++++++++++-------- .../component/netty4/NettyTCPChainedTest.java | 89 ++++++++ 2 files changed, 216 insertions(+), 82 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/7acb3750/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java index a1b8d7f..4008422 100644 --- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java +++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java @@ -60,7 +60,7 @@ public class NettyProducer extends DefaultAsyncProducer { private ClientInitializerFactory pipelineFactory; private CamelLogger noReplyLogger; private EventLoopGroup workerGroup; - private ObjectPool<Channel> pool; + private ObjectPool<ChannelFuture> pool; private Map<Channel, NettyCamelState> nettyCamelStatesMap = new ConcurrentHashMap<Channel, NettyCamelState>(); public NettyProducer(NettyEndpoint nettyEndpoint, NettyConfiguration configuration) { @@ -111,14 +111,14 @@ public class NettyProducer extends DefaultAsyncProducer { config.timeBetweenEvictionRunsMillis = 30 * 1000L; config.minEvictableIdleTimeMillis = configuration.getProducerPoolMinEvictableIdle(); config.whenExhaustedAction = GenericObjectPool.WHEN_EXHAUSTED_FAIL; - pool = new GenericObjectPool<Channel>(new NettyProducerPoolableObjectFactory(), config); + pool = new GenericObjectPool<ChannelFuture>(new NettyProducerPoolableObjectFactory(), config); if (LOG.isDebugEnabled()) { LOG.debug("Created NettyProducer pool[maxActive={}, minIdle={}, maxIdle={}, minEvictableIdleTimeMillis={}] -> {}", new Object[]{config.maxActive, config.minIdle, config.maxIdle, config.minEvictableIdleTimeMillis, pool}); } } else { - pool = new SharedSingletonObjectPool<Channel>(new NettyProducerPoolableObjectFactory()); + pool = new SharedSingletonObjectPool<ChannelFuture>(new NettyProducerPoolableObjectFactory()); if (LOG.isDebugEnabled()) { LOG.info("Created NettyProducer shared singleton pool -> {}", pool); } @@ -134,8 +134,9 @@ public class NettyProducer extends DefaultAsyncProducer { if (!configuration.isLazyChannelCreation()) { // ensure the connection can be established when we start up - Channel channel = pool.borrowObject(); - pool.returnObject(channel); + ChannelFuture channelFuture = pool.borrowObject(); + channelFuture.get(); + pool.returnObject(channelFuture); } } @@ -197,16 +198,19 @@ public class NettyProducer extends DefaultAsyncProducer { } // get a channel from the pool - Channel existing = null; + ChannelFuture channelFuture = null; + Channel channel = null; try { if (getConfiguration().isReuseChannel()) { - existing = exchange.getProperty(NettyConstants.NETTY_CHANNEL, Channel.class); + channel = exchange.getProperty(NettyConstants.NETTY_CHANNEL, Channel.class); } - if (existing == null) { - existing = pool.borrowObject(); - if (existing != null) { - LOG.trace("Got channel from pool {}", existing); + if (channel == null) { + channelFuture = pool.borrowObject(); + if (channelFuture != null) { + LOG.trace("Got channel request from pool {}", channelFuture); } + } else { + channelFuture = channel.newSucceededFuture(); } } catch (Exception e) { exchange.setException(e); @@ -215,16 +219,22 @@ public class NettyProducer extends DefaultAsyncProducer { } // we must have a channel - if (existing == null) { + if (channelFuture == null) { exchange.setException(new CamelExchangeException("Cannot get channel from pool", exchange)); callback.done(true); return true; } + channelFuture.addListener(new ChannelConnectedListener(exchange, callback, body)); + return false; + } + + public void processWithConnectedChannel(final Exchange exchange, AsyncCallback callback, ChannelFuture channelFuture, + Object body) { // remember channel so we can reuse it + final Channel channel = channelFuture.channel(); if (getConfiguration().isReuseChannel() && exchange.getProperty(NettyConstants.NETTY_CHANNEL) == null) { - final Channel channel = existing; - exchange.setProperty(NettyConstants.NETTY_CHANNEL, existing); + exchange.setProperty(NettyConstants.NETTY_CHANNEL, channel); // and defer closing the channel until we are done routing the exchange exchange.addOnCompletion(new SynchronizationAdapter() { @Override @@ -248,36 +258,22 @@ public class NettyProducer extends DefaultAsyncProducer { NettyHelper.close(channel); } - try { - // Only put the connected channel back to the pool - if (channel.isActive()) { - LOG.trace("Putting channel back to pool {}", channel); - pool.returnObject(channel); - } else { - // and if its not active then invalidate it - LOG.trace("Invalidating channel from pool {}", channel); - pool.invalidateObject(channel); - } - } catch (Exception e) { - LOG.warn("Error returning channel to pool " + channel + ". This exception will be ignored.", e); - } + releaseChannel(channelFuture); } }); } if (exchange.getIn().getHeader(NettyConstants.NETTY_REQUEST_TIMEOUT) != null) { long timeoutInMs = exchange.getIn().getHeader(NettyConstants.NETTY_REQUEST_TIMEOUT, Long.class); - ChannelHandler oldHandler = existing.pipeline().get("timeout"); + ChannelHandler oldHandler = channel.pipeline().get("timeout"); ReadTimeoutHandler newHandler = new ReadTimeoutHandler(timeoutInMs, TimeUnit.MILLISECONDS); if (oldHandler == null) { - existing.pipeline().addBefore("handler", "timeout", newHandler); + channel.pipeline().addBefore("handler", "timeout", newHandler); } else { - existing.pipeline().replace(oldHandler, "timeout", newHandler); + channel.pipeline().replace(oldHandler, "timeout", newHandler); } } - // need to declare as final - final Channel channel = existing; final AsyncCallback producerCallback; if (configuration.isReuseChannel()) { @@ -285,7 +281,7 @@ public class NettyProducer extends DefaultAsyncProducer { // as when reuse channel is enabled it will put the channel back in the pool when exchange is done using on completion producerCallback = callback; } else { - producerCallback = new NettyProducerCallback(channel, callback); + producerCallback = new NettyProducerCallback(channelFuture, callback); } // setup state as attachment on the channel, so we can access the state later when needed @@ -337,8 +333,6 @@ public class NettyProducer extends DefaultAsyncProducer { } }); - // continue routing asynchronously - return false; } /** @@ -466,30 +460,38 @@ public class NettyProducer extends DefaultAsyncProducer { } } - protected Channel openChannel(ChannelFuture channelFuture) throws Exception { + protected void notifyChannelOpen(ChannelFuture channelFuture) throws Exception { // blocking for channel to be done if (LOG.isTraceEnabled()) { - LOG.trace("Waiting for operation to complete {} for {} millis", channelFuture, configuration.getConnectTimeout()); + LOG.trace("Channel open finished with {}", channelFuture); } - // wait for the channel to be open (see io.netty.channel.ChannelFuture javadoc for example/recommendation) - channelFuture.awaitUninterruptibly(); + if (channelFuture.isSuccess()) { + Channel answer = channelFuture.channel(); + // to keep track of all channels in use + allChannels.add(answer); - if (!channelFuture.isDone() || !channelFuture.isSuccess()) { - ConnectException cause = new ConnectException("Cannot connect to " + configuration.getAddress()); - if (channelFuture.cause() != null) { - cause.initCause(channelFuture.cause()); + if (LOG.isDebugEnabled()) { + LOG.debug("Creating connector to address: {}", configuration.getAddress()); } - throw cause; } - Channel answer = channelFuture.channel(); - // to keep track of all channels in use - allChannels.add(answer); + } - if (LOG.isDebugEnabled()) { - LOG.debug("Creating connector to address: {}", configuration.getAddress()); + protected void releaseChannel(ChannelFuture channelFuture) { + Channel channel = channelFuture.channel(); + try { + // Only put the connected channel back to the pool + if (channel.isActive()) { + LOG.trace("Putting channel back to pool {}", channel); + pool.returnObject(channelFuture); + } else { + // and if its not active then invalidate it + LOG.trace("Invalidating channel from pool {}", channel); + pool.invalidateObject(channelFuture); + } + } catch (Exception e) { + LOG.warn("Error returning channel to pool " + channel + ". This exception will be ignored.", e); } - return answer; } public NettyConfiguration getConfiguration() { @@ -510,11 +512,11 @@ public class NettyProducer extends DefaultAsyncProducer { */ private final class NettyProducerCallback implements AsyncCallback { - private final Channel channel; + private final ChannelFuture channelFuture; private final AsyncCallback callback; - private NettyProducerCallback(Channel channel, AsyncCallback callback) { - this.channel = channel; + private NettyProducerCallback(ChannelFuture channelFuture, AsyncCallback callback) { + this.channelFuture = channelFuture; this.callback = callback; } @@ -522,17 +524,7 @@ public class NettyProducer extends DefaultAsyncProducer { public void done(boolean doneSync) { // put back in pool try { - // Only put the connected channel back to the pool - if (channel.isActive()) { - LOG.trace("Putting channel back to pool {}", channel); - pool.returnObject(channel); - } else { - // and if its not active then invalidate it - LOG.trace("Invalidating channel from pool {}", channel); - pool.invalidateObject(channel); - } - } catch (Exception e) { - LOG.warn("Error returning channel to pool " + channel + ". This exception will be ignored.", e); + releaseChannel(channelFuture); } finally { // ensure we call the delegated callback callback.done(doneSync); @@ -543,44 +535,97 @@ public class NettyProducer extends DefaultAsyncProducer { /** * Object factory to create {@link Channel} used by the pool. */ - private final class NettyProducerPoolableObjectFactory implements PoolableObjectFactory<Channel> { + private final class NettyProducerPoolableObjectFactory implements PoolableObjectFactory<ChannelFuture> { @Override - public Channel makeObject() throws Exception { - ChannelFuture channelFuture = openConnection(); - Channel answer = openChannel(channelFuture); - LOG.trace("Created channel: {}", answer); - return answer; + public ChannelFuture makeObject() throws Exception { + ChannelFuture channelFuture = openConnection().addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + notifyChannelOpen(future); + } + }); + LOG.trace("Requested channel: {}", channelFuture); + return channelFuture; } @Override - public void destroyObject(Channel channel) throws Exception { - LOG.trace("Destroying channel: {}", channel); - if (channel.isOpen()) { - NettyHelper.close(channel); - } - allChannels.remove(channel); + public void destroyObject(ChannelFuture channelFuture) throws Exception { + LOG.trace("Destroying channel request: {}", channelFuture); + channelFuture.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + Channel channel = future.channel(); + if (channel.isOpen()) { + NettyHelper.close(channel); + } + allChannels.remove(channel); + } + }); + channelFuture.cancel(false); } @Override - public boolean validateObject(Channel channel) { - // we need a connected channel to be valid + public boolean validateObject(ChannelFuture channelFuture) { + // we need a connecting or connected channel to be valid + if (!channelFuture.isDone()) { + LOG.trace("Validating connecting channel request: {} -> {}", channelFuture, true); + return true; + } + if (!channelFuture.isSuccess()) { + LOG.trace("Validating unsuccessful channel request: {} -> {}", channelFuture, false); + return false; + } + Channel channel = channelFuture.channel(); boolean answer = channel.isActive(); LOG.trace("Validating channel: {} -> {}", channel, answer); return answer; } @Override - public void activateObject(Channel channel) throws Exception { + public void activateObject(ChannelFuture channelFuture) throws Exception { // noop - LOG.trace("activateObject channel: {} -> {}", channel); + LOG.trace("activateObject channel request: {} -> {}", channelFuture); } @Override - public void passivateObject(Channel channel) throws Exception { + public void passivateObject(ChannelFuture channelFuture) throws Exception { // noop - LOG.trace("passivateObject channel: {} -> {}", channel); + LOG.trace("passivateObject channel request: {} -> {}", channelFuture); } } + /** + * Listener waiting for connection finished while processing exchange + */ + private class ChannelConnectedListener implements ChannelFutureListener { + private final Exchange exchange; + private final AsyncCallback callback; + private final Object body; + + public ChannelConnectedListener(Exchange exchange, AsyncCallback callback, Object body) { + this.exchange = exchange; + this.callback = callback; + this.body = body; + } + + @Override + public void operationComplete(ChannelFuture future) { + if (!future.isDone() || !future.isSuccess()) { + ConnectException cause = new ConnectException("Cannot connect to " + configuration.getAddress()); + if (future.cause() != null) { + cause.initCause(future.cause()); + } + exchange.setException(cause); + callback.done(false); + + } + try { + processWithConnectedChannel(exchange, callback, future, body); + } catch (Throwable e) { + exchange.setException(e); + callback.done(false); + } + } + } } http://git-wip-us.apache.org/repos/asf/camel/blob/7acb3750/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyTCPChainedTest.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyTCPChainedTest.java b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyTCPChainedTest.java new file mode 100644 index 0000000..4c4904b --- /dev/null +++ b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyTCPChainedTest.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.netty4; + +import org.apache.camel.EndpointInject; +import org.apache.camel.Exchange; +import org.apache.camel.Message; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.converter.IOConverter; +import org.apache.camel.util.IOHelper; +import org.junit.Assert; +import org.junit.Test; + +import java.io.FileInputStream; +import java.io.InputStream; + +/** + * In this test we are checking that same netty endpoint can be safely called twice + * in single route with reconnect. It requires for processing to be fully async otherwise + * {@link io.netty.util.concurrent.BlockingOperationException} is thrown by netty. + */ +public class NettyTCPChainedTest extends BaseNettyTest { + @EndpointInject(uri = "mock:result") + protected MockEndpoint resultEndpoint; + + private void sendFile(String uri) throws Exception { + Exchange exchange = template.asyncSend(uri, new Processor() { + public void process(Exchange exchange) throws Exception { + // Read from an input stream + InputStream is = IOHelper.buffered(new FileInputStream("src/test/resources/test.txt")); + + byte buffer[] = IOConverter.toBytes(is); + is.close(); + + // Set the property of the charset encoding + exchange.setProperty(Exchange.CHARSET_NAME, "UTF-8"); + Message in = exchange.getIn(); + in.setBody(buffer); + } + }).get(); + if (exchange.getException() != null) { + throw new AssertionError(exchange.getException()); + } + Assert.assertFalse(exchange.isFailed()); + } + + @Test + public void testTCPChainedConnectionFromCallbackThread() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(2); + sendFile("direct:chainedCalls"); + + mock.assertIsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("netty4:tcp://localhost:{{port}}?sync=false") + .to("log:result") + .to("mock:result"); + from("direct:nettyCall") + .to("netty4:tcp://localhost:{{port}}?sync=false&disconnect=true&workerCount=1"); + from("direct:chainedCalls") + .to("direct:nettyCall") + .to("direct:nettyCall"); + } + }; + } + +}