Author: davsclaus Date: Thu May 6 13:14:38 2010 New Revision: 941713 URL: http://svn.apache.org/viewvc?rev=941713&view=rev Log: CAMEL-2699: camel-netty will reconnect if not connected. Added lazyChannelCreation option.
Added: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTCPSyncNotLazyChannelTest.java - copied, changed from r941661, camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTCPSyncTest.java Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/MultipleCodecsTest.java camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyConcurrentTest.java camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTCPAsyncTest.java camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUDPAsyncTest.java camel/trunk/components/camel-netty/src/test/resources/org/apache/camel/component/netty/multiple-codecs.xml Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java?rev=941713&r1=941712&r2=941713&view=diff ============================================================================== --- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java (original) +++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java Thu May 6 13:14:38 2010 @@ -42,7 +42,7 @@ public class NettyConfiguration { private long connectTimeoutMillis; private long receiveTimeoutMillis; private boolean reuseAddress; - private boolean sync; + private boolean sync = true; private String passphrase; private File keyStoreFile; private File trustStoreFile; @@ -58,13 +58,14 @@ public class NettyConfiguration { private String keyStoreFormat; private String securityProvider; private boolean disconnect; + private boolean lazyChannelCreation = true; public NettyConfiguration() { setKeepAlive(true); setTcpNoDelay(true); setBroadcast(false); setReuseAddress(true); - setSync(false); + setSync(true); setConnectTimeoutMillis(10000); setReceiveTimeoutMillis(10000); setSendBufferSize(65536); @@ -72,6 +73,7 @@ public class NettyConfiguration { setSsl(false); setCorePoolSize(10); setMaxPoolSize(100); + setLazyChannelCreation(true); } public void parseURI(URI uri, Map<String, Object> parameters, NettyComponent component) throws Exception { @@ -143,6 +145,9 @@ public class NettyConfiguration { if (settings.containsKey("disconnect")) { setDisconnect(Boolean.valueOf((String) settings.get("disconnect"))); } + if (settings.containsKey("lazyChannelCreation")) { + setLazyChannelCreation(Boolean.valueOf((String) settings.get("lazyChannelCreation"))); + } } public String getProtocol() { @@ -365,6 +370,14 @@ public class NettyConfiguration { this.disconnect = disconnect; } + public boolean isLazyChannelCreation() { + return lazyChannelCreation; + } + + public void setLazyChannelCreation(boolean lazyChannelCreation) { + this.lazyChannelCreation = lazyChannelCreation; + } + public String getAddress() { return host + ":" + port; } Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java?rev=941713&r1=941712&r2=941713&view=diff ============================================================================== --- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java (original) +++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java Thu May 6 13:14:38 2010 @@ -63,7 +63,7 @@ public final class NettyHelper { future.awaitUninterruptibly(); // if it was not a success then thrown an exception - if (future.isSuccess() == false) { + if (!future.isSuccess()) { LOG.warn("Cannot write body: " + body + " using channel: " + channel); throw new CamelExchangeException("Cannot write body", exchange, future.getCause()); } Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java?rev=941713&r1=941712&r2=941713&view=diff ============================================================================== --- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java (original) +++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java Thu May 6 13:14:38 2010 @@ -52,7 +52,6 @@ public class NettyProducer extends Defau private CountDownLatch countdownLatch; private ChannelFactory channelFactory; private DatagramChannelFactory datagramChannelFactory; - private ChannelFuture channelFuture; private Channel channel; private ClientBootstrap clientBootstrap; private ConnectionlessBootstrap connectionlessClientBootstrap; @@ -81,11 +80,15 @@ public class NettyProducer extends Defau @Override protected void doStart() throws Exception { super.doStart(); + if (configuration.getProtocol().equalsIgnoreCase("udp")) { setupUDPCommunication(); } else { setupTCPCommunication(); } + if (!configuration.isLazyChannelCreation()) { + openConnection(); + } } @Override @@ -93,20 +96,18 @@ public class NettyProducer extends Defau if (LOG.isDebugEnabled()) { LOG.debug("Stopping producer at address: " + configuration.getAddress()); } - - // close all channels - ChannelGroupFuture future = allChannels.close(); - future.awaitUninterruptibly(); - - // and then release other resources - if (channelFactory != null) { - channelFactory.releaseExternalResources(); - } - + closeConnection(); super.doStop(); } public void process(Exchange exchange) throws Exception { + if (channel == null && !configuration.isLazyChannelCreation()) { + throw new IllegalStateException("Not started yet!"); + } + if (channel == null || !channel.isConnected()) { + openConnection(); + } + if (configuration.isSync()) { countdownLatch = new CountDownLatch(1); } @@ -164,17 +165,6 @@ public class NettyProducer extends Defau clientPipeline = clientPipelineFactory.getPipeline(); clientBootstrap.setPipeline(clientPipeline); } - - channelFuture = clientBootstrap.connect(new InetSocketAddress(configuration.getHost(), configuration.getPort())); - channelFuture.awaitUninterruptibly(); - if (!channelFuture.isSuccess()) { - throw new CamelException("Cannot connect to " + configuration.getAddress(), channelFuture.getCause()); - } - channel = channelFuture.getChannel(); - // to keep track of all channels in use - allChannels.add(channel); - - LOG.info("Netty TCP Producer started and now listening on: " + configuration.getAddress()); } protected void setupUDPCommunication() throws Exception { @@ -199,9 +189,20 @@ public class NettyProducer extends Defau clientPipeline = clientPipelineFactory.getPipeline(); connectionlessClientBootstrap.setPipeline(clientPipeline); } + } + + private void openConnection() throws Exception { + ChannelFuture channelFuture; + + if (clientBootstrap != null) { + channelFuture = clientBootstrap.connect(new InetSocketAddress(configuration.getHost(), configuration.getPort())); + } else if (connectionlessClientBootstrap != null) { + connectionlessClientBootstrap.bind(new InetSocketAddress(0)); + channelFuture = connectionlessClientBootstrap.connect(new InetSocketAddress(configuration.getHost(), configuration.getPort())); + } else { + throw new IllegalStateException("Should either be TCP or UDP"); + } - connectionlessClientBootstrap.bind(new InetSocketAddress(0)); - channelFuture = connectionlessClientBootstrap.connect(new InetSocketAddress(configuration.getHost(), configuration.getPort())); channelFuture.awaitUninterruptibly(); if (!channelFuture.isSuccess()) { throw new CamelException("Cannot connect to " + configuration.getAddress(), channelFuture.getCause()); @@ -210,7 +211,20 @@ public class NettyProducer extends Defau // to keep track of all channels in use allChannels.add(channel); - LOG.info("Netty UDP Producer started and now listening on: " + configuration.getAddress()); + if (LOG.isDebugEnabled()) { + LOG.debug("Creating connector to address: " + configuration.getAddress()); + } + } + + private void closeConnection() throws Exception { + // close all channels + ChannelGroupFuture future = allChannels.close(); + future.awaitUninterruptibly(); + + // and then release other resources + if (channelFactory != null) { + channelFactory.releaseExternalResources(); + } } public NettyConfiguration getConfiguration() { @@ -225,10 +239,6 @@ public class NettyProducer extends Defau return countdownLatch; } - public void setCountdownLatch(CountDownLatch countdownLatch) { - this.countdownLatch = countdownLatch; - } - public ChannelFactory getChannelFactory() { return channelFactory; } @@ -237,14 +247,6 @@ public class NettyProducer extends Defau this.channelFactory = channelFactory; } - public ChannelFuture getChannelFuture() { - return channelFuture; - } - - public void setChannelFuture(ChannelFuture channelFuture) { - this.channelFuture = channelFuture; - } - public ClientBootstrap getClientBootstrap() { return clientBootstrap; } @@ -261,14 +263,6 @@ public class NettyProducer extends Defau this.clientPipelineFactory = clientPipelineFactory; } - public ChannelPipeline getClientPipeline() { - return clientPipeline; - } - - public void setClientPipeline(ChannelPipeline clientPipeline) { - this.clientPipeline = clientPipeline; - } - public ChannelGroup getAllChannels() { return allChannels; } Modified: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/MultipleCodecsTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/MultipleCodecsTest.java?rev=941713&r1=941712&r2=941713&view=diff ============================================================================== --- camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/MultipleCodecsTest.java (original) +++ camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/MultipleCodecsTest.java Thu May 6 13:14:38 2010 @@ -78,8 +78,9 @@ public class MultipleCodecsTest extends return new RouteBuilder() { public void configure() throws Exception { // START SNIPPET: routes - from("direct:multiple-codec").to("netty:tcp://localhost:5150?encoders=#encoders"); - from("netty:tcp://localhost:5150?decoders=#length-decoder,#string-decoder").to("mock:multiple-codec"); + from("direct:multiple-codec").to("netty:tcp://localhost:5150?encoders=#encoders&sync=false"); + + from("netty:tcp://localhost:5150?decoders=#length-decoder,#string-decoder&sync=false").to("mock:multiple-codec"); // START SNIPPET: routes } }; Modified: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyConcurrentTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyConcurrentTest.java?rev=941713&r1=941712&r2=941713&view=diff ============================================================================== --- camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyConcurrentTest.java (original) +++ camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyConcurrentTest.java Thu May 6 13:14:38 2010 @@ -49,7 +49,7 @@ public class NettyConcurrentTest extends final int index = i; Future out = executor.submit(new Callable<Object>() { public Object call() throws Exception { - return template.requestBody("netty:tcp://localhost:5150?sync=true", index, String.class); + return template.requestBody("netty:tcp://localhost:5150", index, String.class); } }); responses.put(index, out); Modified: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTCPAsyncTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTCPAsyncTest.java?rev=941713&r1=941712&r2=941713&view=diff ============================================================================== --- camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTCPAsyncTest.java (original) +++ camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTCPAsyncTest.java Thu May 6 13:14:38 2010 @@ -68,7 +68,7 @@ public class NettyTCPAsyncTest extends C MockEndpoint mock = getMockEndpoint("mock:result"); mock.expectedMessageCount(1); - sendFile("netty:tcp://localhost:5150"); + sendFile("netty:tcp://localhost:5150?sync=false"); mock.assertIsSatisfied(); if (LOG.isDebugEnabled()) { @@ -81,7 +81,7 @@ public class NettyTCPAsyncTest extends C return new RouteBuilder() { @Override public void configure() throws Exception { - from("netty:tcp://localhost:5150") + from("netty:tcp://localhost:5150?sync=false") .to("mock:result"); } }; Copied: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTCPSyncNotLazyChannelTest.java (from r941661, camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTCPSyncTest.java) URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTCPSyncNotLazyChannelTest.java?p2=camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTCPSyncNotLazyChannelTest.java&p1=camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTCPSyncTest.java&r1=941661&r2=941713&rev=941713&view=diff ============================================================================== --- camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTCPSyncTest.java (original) +++ camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTCPSyncNotLazyChannelTest.java Thu May 6 13:14:38 2010 @@ -26,8 +26,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.junit.Test; -public class NettyTCPSyncTest extends CamelTestSupport { - private static final transient Log LOG = LogFactory.getLog(NettyTCPSyncTest.class); +public class NettyTCPSyncNotLazyChannelTest extends CamelTestSupport { + private static final transient Log LOG = LogFactory.getLog(NettyTCPSyncNotLazyChannelTest.class); @Produce(uri = "direct:start") protected ProducerTemplate producerTemplate; @@ -37,12 +37,12 @@ public class NettyTCPSyncTest extends Ca if (LOG.isDebugEnabled()) { LOG.debug("Beginning Test ---> testTCPInOutWithNettyConsumer()"); } - + String response = producerTemplate.requestBody( - "netty:tcp://localhost:5150?sync=true", - "Epitaph in Kohima, India marking the WWII Battle of Kohima and Imphal, Burma Campaign - Attributed to John Maxwell Edmonds", String.class); + "netty:tcp://localhost:5151?sync=true&lazyChannelCreation=false", + "Epitaph in Kohima, India marking the WWII Battle of Kohima and Imphal, Burma Campaign - Attributed to John Maxwell Edmonds", String.class); assertEquals("When You Go Home, Tell Them Of Us And Say, For Your Tomorrow, We Gave Our Today.", response); - + if (LOG.isDebugEnabled()) { LOG.debug("Completed Test ---> testTCPInOutWithNettyConsumer()"); } @@ -53,22 +53,22 @@ public class NettyTCPSyncTest extends Ca if (LOG.isDebugEnabled()) { LOG.debug("Beginning Test ---> testUDPInOutWithNettyConsumer()"); } - + Poetry poetry = new Poetry(); - Poetry response = (Poetry) producerTemplate.requestBody("netty:tcp://localhost:5150?sync=true", poetry); + Poetry response = (Poetry) producerTemplate.requestBody("netty:tcp://localhost:5151?sync=true&lazyChannelCreation=false", poetry); assertEquals("Dr. Sarojini Naidu", response.getPoet()); - + if (LOG.isDebugEnabled()) { LOG.debug("Completed Test ---> testUDPInOutWithNettyConsumer()"); } - } - + } + @Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override public void configure() throws Exception { - from("netty:tcp://localhost:5150?sync=true") + from("netty:tcp://localhost:5151?sync=true") .process(new Processor() { public void process(Exchange exchange) throws Exception { if (exchange.getIn().getBody() instanceof Poetry) { @@ -77,11 +77,11 @@ public class NettyTCPSyncTest extends Ca exchange.getOut().setBody(poetry); return; } - exchange.getOut().setBody("When You Go Home, Tell Them Of Us And Say, For Your Tomorrow, We Gave Our Today."); + exchange.getOut().setBody("When You Go Home, Tell Them Of Us And Say, For Your Tomorrow, We Gave Our Today."); } - }); + }); } }; } -} +} \ No newline at end of file Modified: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUDPAsyncTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUDPAsyncTest.java?rev=941713&r1=941712&r2=941713&view=diff ============================================================================== --- camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUDPAsyncTest.java (original) +++ camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUDPAsyncTest.java Thu May 6 13:14:38 2010 @@ -68,7 +68,7 @@ public class NettyUDPAsyncTest extends C MockEndpoint mock = getMockEndpoint("mock:result"); mock.expectedMessageCount(1); - sendFile("netty:udp://localhost:5151"); + sendFile("netty:udp://localhost:5151?sync=false"); mock.assertIsSatisfied(); if (LOG.isDebugEnabled()) { @@ -82,7 +82,7 @@ public class NettyUDPAsyncTest extends C return new RouteBuilder() { @Override public void configure() throws Exception { - from("netty:udp://localhost:5151") + from("netty:udp://localhost:5151?sync=false") .to("mock:result") .to("log:Message"); } Modified: camel/trunk/components/camel-netty/src/test/resources/org/apache/camel/component/netty/multiple-codecs.xml URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/resources/org/apache/camel/component/netty/multiple-codecs.xml?rev=941713&r1=941712&r2=941713&view=diff ============================================================================== --- camel/trunk/components/camel-netty/src/test/resources/org/apache/camel/component/netty/multiple-codecs.xml (original) +++ camel/trunk/components/camel-netty/src/test/resources/org/apache/camel/component/netty/multiple-codecs.xml Thu May 6 13:14:38 2010 @@ -16,56 +16,57 @@ limitations under the License. --> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xmlns:util="http://www.springframework.org/schema/util" - xsi:schemaLocation=" + xmlns:util="http://www.springframework.org/schema/util" + xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-2.5.xsd http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd"> - - <!-- START SNIPPET: routes --> - <camelContext id="multiple-netty-codecs-context" xmlns="http://camel.apache.org/schema/spring"> - <route> - <from uri="direct:multiple-codec" /> - <to uri="netty:tcp://localhost:5150?encoders=#encoders" /> - </route> - <route> - <from uri="netty:tcp://localhost:5150?decoders=#length-decoder,#string-decoder" /> - <to uri="mock:multiple-codec" /> - </route> - </camelContext> - <!-- END SNIPPET: routes --> - - <!-- START SNIPPET: registry-beans --> - <util:list id="decoders" list-class="java.util.LinkedList"> - <bean class="org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder"> - <constructor-arg value="1048576" /> - <constructor-arg value="0" /> - <constructor-arg value="4" /> - <constructor-arg value="0" /> - <constructor-arg value="4" /> + + <!-- START SNIPPET: routes --> + <camelContext id="multiple-netty-codecs-context" xmlns="http://camel.apache.org/schema/spring"> + <route> + <from uri="direct:multiple-codec"/> + <to uri="netty:tcp://localhost:5150?encoders=#encoders&sync=false"/> + </route> + <route> + <from uri="netty:tcp://localhost:5150?decoders=#length-decoder,#string-decoder&sync=false"/> + <to uri="mock:multiple-codec"/> + </route> + </camelContext> + <!-- END SNIPPET: routes --> + + <!-- START SNIPPET: registry-beans --> + <util:list id="decoders" list-class="java.util.LinkedList"> + <bean class="org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder"> + <constructor-arg value="1048576"/> + <constructor-arg value="0"/> + <constructor-arg value="4"/> + <constructor-arg value="0"/> + <constructor-arg value="4"/> + </bean> + <bean class="org.jboss.netty.handler.codec.string.StringDecoder"/> + </util:list> + + <util:list id="encoders" list-class="java.util.LinkedList"> + <bean class="org.jboss.netty.handler.codec.frame.LengthFieldPrepender"> + <constructor-arg value="4"/> + </bean> + <bean class="org.jboss.netty.handler.codec.string.StringEncoder"/> + </util:list> + + <bean id="length-encoder" class="org.jboss.netty.handler.codec.frame.LengthFieldPrepender"> + <constructor-arg value="4"/> </bean> - <bean class="org.jboss.netty.handler.codec.string.StringDecoder" /> - </util:list> - - <util:list id="encoders" list-class="java.util.LinkedList"> - <bean class="org.jboss.netty.handler.codec.frame.LengthFieldPrepender"> - <constructor-arg value="4" /> + <bean id="string-encoder" class="org.jboss.netty.handler.codec.string.StringEncoder"/> + + <bean id="length-decoder" class="org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder"> + <constructor-arg value="1048576"/> + <constructor-arg value="0"/> + <constructor-arg value="4"/> + <constructor-arg value="0"/> + <constructor-arg value="4"/> </bean> - <bean class="org.jboss.netty.handler.codec.string.StringEncoder" /> - </util:list> - - <bean id="length-encoder" class="org.jboss.netty.handler.codec.frame.LengthFieldPrepender"> - <constructor-arg value="4" /> - </bean> - <bean id="string-encoder" class="org.jboss.netty.handler.codec.string.StringEncoder" /> - - <bean id="length-decoder" class="org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder"> - <constructor-arg value="1048576" /> - <constructor-arg value="0" /> - <constructor-arg value="4" /> - <constructor-arg value="0" /> - <constructor-arg value="4" /> - </bean> - <bean id="string-decoder" class="org.jboss.netty.handler.codec.string.StringDecoder" /> - <!-- START SNIPPET: registry-beans --> + <bean id="string-decoder" class="org.jboss.netty.handler.codec.string.StringDecoder"/> + <!-- START SNIPPET: registry-beans --> + </beans>