Author: davsclaus Date: Wed Jun 13 11:36:58 2012 New Revision: 1349765 URL: http://svn.apache.org/viewvc?rev=1349765&view=rev Log: CAMEL-5151: Fixed camel-netty to support proxy use cases.
Added: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyProxyTest.java Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java?rev=1349765&r1=1349764&r2=1349765&view=diff ============================================================================== --- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java (original) +++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java Wed Jun 13 11:36:58 2012 @@ -18,11 +18,11 @@ package org.apache.camel.component.netty import java.net.SocketAddress; -import org.apache.camel.CamelExchangeException; import org.apache.camel.Exchange; import org.apache.camel.NoTypeConversionAvailableException; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelFuture; +import org.jboss.netty.channel.ChannelFutureListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,16 +73,16 @@ public final class NettyHelper { } /** - * Writes the given body to Netty channel. Will wait until the body has been written. + * Writes the given body to Netty channel. Will <b>not</b >wait until the body has been written. * * @param channel the Netty channel * @param remoteAddress the remote address when using UDP * @param body the body to write (send) * @param exchange the exchange - * @throws CamelExchangeException is thrown if the body could not be written for some reasons - * (eg remote connection is closed etc.) + * @param listener listener with work to be executed when the operation is complete */ - public static void writeBodySync(Channel channel, SocketAddress remoteAddress, Object body, Exchange exchange) throws CamelExchangeException { + public static void writeBodyAsync(Channel channel, SocketAddress remoteAddress, Object body, + Exchange exchange, ChannelFutureListener listener) { // the write operation is asynchronous. Use future to wait until the session has been written ChannelFuture future; if (remoteAddress != null) { @@ -91,15 +91,7 @@ public final class NettyHelper { future = channel.write(body); } - // wait for the write - LOG.trace("Waiting for write to complete"); - future.awaitUninterruptibly(); - - // if it was not a success then thrown an exception - if (!future.isSuccess()) { - LOG.warn("Cannot write body: " + body + " using channel: " + channel); - throw new CamelExchangeException("Cannot write body", exchange, future.getCause()); - } + future.addListener(listener); } /** Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java?rev=1349765&r1=1349764&r2=1349765&view=diff ============================================================================== --- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java (original) +++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java Wed Jun 13 11:36:58 2012 @@ -17,6 +17,7 @@ package org.apache.camel.component.netty; import java.net.InetSocketAddress; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; @@ -172,6 +173,7 @@ public class NettyProducer extends Defau // allow to reuse channel, on this producer, to avoid creating a new connection // for each message being sent if (channelFuture == null || channel == null || !channel.isOpen()) { + channel = null; channelFuture = openConnection(); channel = openChannel(channelFuture); } @@ -285,6 +287,7 @@ public class NettyProducer extends Defau // set the pipeline factory, which creates the pipeline for each newly created channels clientBootstrap.setPipelineFactory(pipelineFactory); answer = clientBootstrap.connect(new InetSocketAddress(configuration.getHost(), configuration.getPort())); + LOG.trace("Created new TCP client bootstrap connecting to {}:{}", configuration.getHost(), configuration.getPort()); return answer; } else { ConnectionlessBootstrap connectionlessClientBootstrap = new ConnectionlessBootstrap(datagramChannelFactory); @@ -302,17 +305,31 @@ public class NettyProducer extends Defau Channel channel = connectionlessClientBootstrap.bind(new InetSocketAddress(0)); ALL_CHANNELS.add(channel); answer = connectionlessClientBootstrap.connect(new InetSocketAddress(configuration.getHost(), configuration.getPort())); + LOG.trace("Created new UDP client bootstrap connecting to {}:{}", configuration.getHost(), configuration.getPort()); return answer; } } private Channel openChannel(ChannelFuture channelFuture) throws Exception { - // wait until we got connection - channelFuture.awaitUninterruptibly(); + // wait until until the operation is complete + final CountDownLatch latch = new CountDownLatch(1); + channelFuture.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture channelFuture) throws Exception { + LOG.debug("Operation complete {}", channelFuture); + latch.countDown(); + } + }); + // blocking for channel to be done + LOG.trace("Waiting for operation to complete {}", channelFuture); + latch.await(); + if (!channelFuture.isSuccess()) { + // clear channel as we did not connect + channel = null; throw new CamelException("Cannot connect to " + configuration.getAddress(), channelFuture.getCause()); } - Channel channel = channelFuture.getChannel(); + channel = channelFuture.getChannel(); // to keep track of all channels in use ALL_CHANNELS.add(channel); Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java?rev=1349765&r1=1349764&r2=1349765&view=diff ============================================================================== --- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java (original) +++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java Wed Jun 13 11:36:58 2012 @@ -16,7 +16,10 @@ */ package org.apache.camel.component.netty.handlers; +import java.net.SocketAddress; + import org.apache.camel.AsyncCallback; +import org.apache.camel.CamelExchangeException; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; import org.apache.camel.component.netty.NettyConstants; @@ -26,6 +29,8 @@ import org.apache.camel.component.netty. import org.apache.camel.util.CamelLogger; import org.apache.camel.util.ExchangeHelper; import org.apache.camel.util.IOHelper; +import org.jboss.netty.channel.ChannelFuture; +import org.jboss.netty.channel.ChannelFutureListener; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.ExceptionEvent; @@ -156,31 +161,56 @@ public class ServerChannelHandler extend // we got a body to write LOG.debug("Writing body: {}", body); + ChannelFutureListener listener = new ResponseFutureListener(exchange, messageEvent.getRemoteAddress()); if (consumer.getConfiguration().isTcp()) { - NettyHelper.writeBodySync(messageEvent.getChannel(), null, body, exchange); + NettyHelper.writeBodyAsync(messageEvent.getChannel(), null, body, exchange, listener); } else { - NettyHelper.writeBodySync(messageEvent.getChannel(), messageEvent.getRemoteAddress(), body, exchange); + NettyHelper.writeBodyAsync(messageEvent.getChannel(), messageEvent.getRemoteAddress(), body, exchange, listener); } } + } - // should channel be closed after complete? - Boolean close; - if (ExchangeHelper.isOutCapable(exchange)) { - close = exchange.getOut().getHeader(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, Boolean.class); - } else { - close = exchange.getIn().getHeader(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, Boolean.class); - } + /** + * A {@link ChannelFutureListener} that performs the disconnect logic when + * sending the response is complete. + */ + private final class ResponseFutureListener implements ChannelFutureListener { + + private final Exchange exchange; + private final SocketAddress remoteAddress; - // should we disconnect, the header can override the configuration - boolean disconnect = consumer.getConfiguration().isDisconnect(); - if (close != null) { - disconnect = close; + private ResponseFutureListener(Exchange exchange, SocketAddress remoteAddress) { + this.exchange = exchange; + this.remoteAddress = remoteAddress; } - if (disconnect) { - if (LOG.isDebugEnabled()) { - LOG.debug("Closing channel when complete at address: {}", messageEvent.getRemoteAddress()); + + @Override + public void operationComplete(ChannelFuture future) throws Exception { + // if it was not a success then thrown an exception + if (!future.isSuccess()) { + Exception e = new CamelExchangeException("Cannot write response to " + remoteAddress, exchange, future.getCause()); + consumer.getExceptionHandler().handleException(e); + } + + // should channel be closed after complete? + Boolean close; + if (ExchangeHelper.isOutCapable(exchange)) { + close = exchange.getOut().getHeader(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, Boolean.class); + } else { + close = exchange.getIn().getHeader(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, Boolean.class); + } + + // should we disconnect, the header can override the configuration + boolean disconnect = consumer.getConfiguration().isDisconnect(); + if (close != null) { + disconnect = close; + } + if (disconnect) { + if (LOG.isDebugEnabled()) { + LOG.debug("Closing channel when complete at address: {}", remoteAddress); + } + NettyHelper.close(future.getChannel()); } - NettyHelper.close(messageEvent.getChannel()); } } Added: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyProxyTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyProxyTest.java?rev=1349765&view=auto ============================================================================== --- camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyProxyTest.java (added) +++ camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyProxyTest.java Wed Jun 13 11:36:58 2012 @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.netty; + +import org.apache.camel.builder.RouteBuilder; +import org.junit.Test; + +/** + * + */ +public class NettyProxyTest extends BaseNettyTest { + + private int port1; + private int port2; + + @Test + public void testNettyProxy() throws Exception { + getMockEndpoint("mock:before").expectedBodiesReceived("Camel"); + getMockEndpoint("mock:proxy").expectedBodiesReceived("Camel"); + getMockEndpoint("mock:after").expectedBodiesReceived("Bye Camel"); + + Object body = template.requestBody("netty:tcp://localhost:" + port1 + "?sync=true&textline=true", "Camel\n"); + assertEquals("Bye Camel", body); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + port1 = getPort(); + port2 = getNextPort(); + + fromF("netty:tcp://localhost:%s?sync=true&textline=true", port1) + .to("mock:before") + .toF("netty:tcp://localhost:%s?sync=true&textline=true", port2) + .to("mock:after"); + + fromF("netty:tcp://localhost:%s?sync=true&textline=true", port2) + .to("mock:proxy") + .transform().simple("Bye ${body}\n"); + } + }; + } +}