Repository: camel Updated Branches: refs/heads/master 25fb00796 -> dc9144ecb
CAMEL-9637: camel-netty - Allow to reuse previous Channel in next call Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/dc9144ec Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/dc9144ec Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/dc9144ec Branch: refs/heads/master Commit: dc9144ecb92da6bcf9e14828b12c646a5a30aabd Parents: 25fb007 Author: Claus Ibsen <davscl...@apache.org> Authored: Wed Feb 24 11:24:32 2016 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Wed Feb 24 11:28:11 2016 +0100 ---------------------------------------------------------------------- .../component/netty4/NettyConfiguration.java | 27 +++++- .../camel/component/netty4/NettyConstants.java | 1 + .../camel/component/netty4/NettyProducer.java | 68 +++++++++++++-- .../netty4/handlers/ClientChannelHandler.java | 3 +- .../component/netty4/NettyReuseChannelTest.java | 90 ++++++++++++++++++++ 5 files changed, 179 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/dc9144ec/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConfiguration.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConfiguration.java index cf8dac1..69f9cf9 100644 --- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConfiguration.java +++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConfiguration.java @@ -24,10 +24,12 @@ import java.util.List; import java.util.Map; import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; import io.netty.channel.ChannelHandler; import io.netty.handler.codec.Delimiters; import io.netty.handler.ssl.SslHandler; import io.netty.util.CharsetUtil; +import org.apache.camel.Exchange; import org.apache.camel.LoggingLevel; import org.apache.camel.RuntimeCamelException; import org.apache.camel.spi.UriParam; @@ -104,7 +106,9 @@ public class NettyConfiguration extends NettyServerBootstrapConfiguration implem private boolean useByteBuf; @UriParam(label = "advanced") private boolean udpByteArrayCodec; - + @UriParam(label = "producer") + private boolean reuseChannel; + /** * Returns a copy of this configuration @@ -200,7 +204,7 @@ public class NettyConfiguration extends NettyServerBootstrapConfiguration implem // additional netty options, we don't want to store an empty map, so set it as null if empty options = IntrospectionSupport.extractProperties(parameters, "option."); - if (options != null && options.isEmpty()) { + if (options != null && options.isEmpty()) { options = null; } @@ -584,7 +588,7 @@ public class NettyConfiguration extends NettyServerBootstrapConfiguration implem public void setUdpConnectionlessSending(boolean udpConnectionlessSending) { this.udpConnectionlessSending = udpConnectionlessSending; } - + public boolean isClientMode() { return clientMode; } @@ -618,6 +622,23 @@ public class NettyConfiguration extends NettyServerBootstrapConfiguration implem this.udpByteArrayCodec = udpByteArrayCodec; } + public boolean isReuseChannel() { + return reuseChannel; + } + + /** + * This option allows producers to reuse the same Netty {@link Channel} for the lifecycle of processing the {@link Exchange}. + * This is useable if you need to call a server multiple times in a Camel route and want to use the same network connection. + * When using this the channel is not returned to the connection pool until the {@link Exchange} is done; or disconnected + * if the disconnect option is set to true. + * <p/> + * The reused {@link Channel} is stored on the {@link Exchange} as an exchange property with the key {@link NettyConstants#NETTY_CHANNEL} + * which allows you to obtain the channel during routing and use it as well. + */ + public void setReuseChannel(boolean reuseChannel) { + this.reuseChannel = reuseChannel; + } + private static <T> void addToHandlersList(List<T> configured, List<T> handlers, Class<T> handlerType) { if (handlers != null) { for (T handler : handlers) { http://git-wip-us.apache.org/repos/asf/camel/blob/dc9144ec/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConstants.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConstants.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConstants.java index 5466c2a..fa2a6da 100644 --- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConstants.java +++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConstants.java @@ -35,6 +35,7 @@ public final class NettyConstants { public static final String NETTY_SSL_CLIENT_CERT_NOT_BEFORE = "CamelNettySSLClientCertNotBefore"; public static final String NETTY_SSL_CLIENT_CERT_NOT_AFTER = "CamelNettySSLClientCertNotAfter"; public static final String NETTY_REQUEST_TIMEOUT = "CamelNettyRequestTimeout"; + public static final String NETTY_CHANNEL = "CamelNettyChannel"; private NettyConstants() { // Utility class http://git-wip-us.apache.org/repos/asf/camel/blob/dc9144ec/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java index d0d8eb7..c181ebf 100644 --- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java +++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java @@ -42,6 +42,7 @@ import org.apache.camel.CamelContext; import org.apache.camel.CamelExchangeException; import org.apache.camel.Exchange; import org.apache.camel.impl.DefaultAsyncProducer; +import org.apache.camel.support.SynchronizationAdapter; import org.apache.camel.util.CamelLogger; import org.apache.camel.util.ExchangeHelper; import org.apache.camel.util.IOHelper; @@ -196,11 +197,16 @@ public class NettyProducer extends DefaultAsyncProducer { } // get a channel from the pool - Channel existing; + Channel existing = null; try { - existing = pool.borrowObject(); - if (existing != null) { - LOG.trace("Got channel from pool {}", existing); + if (getConfiguration().isReuseChannel()) { + existing = exchange.getProperty(NettyConstants.NETTY_CHANNEL, Channel.class); + } + if (existing == null) { + existing = pool.borrowObject(); + if (existing != null) { + LOG.trace("Got channel from pool {}", existing); + } } } catch (Exception e) { exchange.setException(e); @@ -215,6 +221,50 @@ public class NettyProducer extends DefaultAsyncProducer { return true; } + // remember channel so we can reuse it + if (getConfiguration().isReuseChannel() && exchange.getProperty(NettyConstants.NETTY_CHANNEL) == null) { + final Channel channel = existing; + exchange.setProperty(NettyConstants.NETTY_CHANNEL, existing); + // and defer closing the channel until we are done routing the exchange + exchange.addOnCompletion(new SynchronizationAdapter() { + @Override + public void onComplete(Exchange exchange) { + // should channel be closed after complete? + Boolean close; + if (ExchangeHelper.isOutCapable(exchange)) { + close = exchange.getOut().getHeader(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, Boolean.class); + } else { + close = exchange.getIn().getHeader(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, Boolean.class); + } + + // should we disconnect, the header can override the configuration + boolean disconnect = getConfiguration().isDisconnect(); + if (close != null) { + disconnect = close; + } + + if (disconnect) { + LOG.trace("Closing channel {} as routing the Exchange is done", channel); + NettyHelper.close(channel); + } + + try { + // Only put the connected channel back to the pool + if (channel.isActive()) { + LOG.trace("Putting channel back to pool {}", channel); + pool.returnObject(channel); + } else { + // and if its not active then invalidate it + LOG.trace("Invalidating channel from pool {}", channel); + pool.invalidateObject(channel); + } + } catch (Exception e) { + LOG.warn("Error returning channel to pool " + channel + ". This exception will be ignored.", e); + } + } + }); + } + if (exchange.getIn().getHeader(NettyConstants.NETTY_REQUEST_TIMEOUT) != null) { long timeoutInMs = exchange.getIn().getHeader(NettyConstants.NETTY_REQUEST_TIMEOUT, Long.class); ChannelHandler oldHandler = existing.pipeline().get("timeout"); @@ -263,7 +313,9 @@ public class NettyProducer extends DefaultAsyncProducer { if (close != null) { disconnect = close; } - if (disconnect) { + + // we should not close if we are reusing the channel + if (!configuration.isReuseChannel() && disconnect) { if (LOG.isTraceEnabled()) { LOG.trace("Closing channel when complete at address: {}", getEndpoint().getConfiguration().getAddress()); } @@ -466,9 +518,13 @@ public class NettyProducer extends DefaultAsyncProducer { if (channel.isActive()) { LOG.trace("Putting channel back to pool {}", channel); pool.returnObject(channel); + } else { + // and if its not active then invalidate it + LOG.trace("Invalidating channel from pool {}", channel); + pool.invalidateObject(channel); } } catch (Exception e) { - LOG.warn("Error returning channel to pool {}. This exception will be ignored.", channel); + LOG.warn("Error returning channel to pool " + channel + ". This exception will be ignored.", e); } finally { // ensure we call the delegated callback callback.done(doneSync); http://git-wip-us.apache.org/repos/asf/camel/blob/dc9144ec/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java index ed4d695..b9a2a17 100644 --- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java +++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java @@ -193,7 +193,8 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> { if (close != null) { disconnect = close; } - if (disconnect) { + // we should not close if we are reusing the channel + if (!producer.getConfiguration().isReuseChannel() && disconnect) { if (LOG.isTraceEnabled()) { LOG.trace("Closing channel when complete at address: {}", producer.getConfiguration().getAddress()); } http://git-wip-us.apache.org/repos/asf/camel/blob/dc9144ec/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyReuseChannelTest.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyReuseChannelTest.java b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyReuseChannelTest.java new file mode 100644 index 0000000..7a4cbdd --- /dev/null +++ b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyReuseChannelTest.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.netty4; + +import java.util.ArrayList; +import java.util.List; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.NotifyBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.junit.Test; + +/** + * @version + */ +public class NettyReuseChannelTest extends BaseNettyTest { + + private final List<Channel> channels = new ArrayList<>(); + + @Test + public void testReuse() throws Exception { + NotifyBuilder notify = new NotifyBuilder(context).whenDone(1).create(); + + getMockEndpoint("mock:a").expectedBodiesReceived("Hello World"); + getMockEndpoint("mock:b").expectedBodiesReceived("Hello Hello World"); + getMockEndpoint("mock:result").expectedBodiesReceived("Hello World", "Hello Hello World"); + + template.sendBody("direct:start", "World\n"); + + assertMockEndpointsSatisfied(); + + assertTrue(notify.matchesMockWaitTime()); + + assertEquals(2, channels.size()); + assertSame("Should reuse channel", channels.get(0), channels.get(1)); + assertFalse("And closed when routing done", channels.get(0).isOpen()); + assertFalse("And closed when routing done", channels.get(1).isOpen()); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start") + .to("netty4:tcp://localhost:{{port}}?textline=true&sync=true&reuseChannel=true&disconnect=true") + .process(new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + Channel channel = exchange.getProperty(NettyConstants.NETTY_CHANNEL, Channel.class); + channels.add(channel); + assertTrue("Should be active", channel.isActive()); + } + }) + .to("mock:a") + .to("netty4:tcp://localhost:{{port}}?textline=true&sync=true&reuseChannel=true&disconnect=true") + .process(new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + Channel channel = exchange.getProperty(NettyConstants.NETTY_CHANNEL, Channel.class); + channels.add(channel); + assertTrue("Should be active", channel.isActive()); + } + }) + .to("mock:b"); + + from("netty4:tcp://localhost:{{port}}?textline=true&sync=true") + .transform(body().prepend("Hello ")) + .to("mock:result"); + } + }; + } +}