Author: davsclaus Date: Thu May 6 15:01:34 2010 New Revision: 941765 URL: http://svn.apache.org/viewvc?rev=941765&view=rev Log: Aligned timeout options with camel-mina. Fixed copy of configuration.
Added: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyComponentWithConfigurationTest.java (with props) camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyExchangeTimeoutTest.java (with props) Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyComponent.java camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyComponent.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyComponent.java?rev=941765&r1=941764&r2=941765&view=diff ============================================================================== --- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyComponent.java (original) +++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyComponent.java Thu May 6 15:01:34 2010 @@ -24,23 +24,36 @@ import org.apache.camel.Endpoint; import org.apache.camel.impl.DefaultComponent; public class NettyComponent extends DefaultComponent { - private NettyConfiguration config; + private NettyConfiguration configuration; public NettyComponent() { - config = new NettyConfiguration(); } public NettyComponent(CamelContext context) { super(context); - config = new NettyConfiguration(); } @Override protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { + NettyConfiguration config; + if (configuration != null) { + config = configuration.copy(); + } else { + config = new NettyConfiguration(); + } + config.parseURI(new URI(remaining), parameters, this); NettyEndpoint nettyEndpoint = new NettyEndpoint(remaining, this, config); setProperties(nettyEndpoint.getConfiguration(), parameters); return nettyEndpoint; } + + public NettyConfiguration getConfiguration() { + return configuration; + } + + public void setConfiguration(NettyConfiguration configuration) { + this.configuration = configuration; + } } \ No newline at end of file Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java?rev=941765&r1=941764&r2=941765&view=diff ============================================================================== --- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java (original) +++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java Thu May 6 15:01:34 2010 @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import org.apache.camel.RuntimeCamelException; import org.apache.camel.util.URISupport; import org.jboss.netty.channel.ChannelDownstreamHandler; import org.jboss.netty.channel.ChannelHandler; @@ -32,16 +33,16 @@ import org.jboss.netty.handler.codec.ser import org.jboss.netty.handler.ssl.SslHandler; @SuppressWarnings("unchecked") -public class NettyConfiguration { +public class NettyConfiguration implements Cloneable { private String protocol; private String host; private int port; - private boolean keepAlive; - private boolean tcpNoDelay; + private boolean keepAlive = true; + private boolean tcpNoDelay = true; private boolean broadcast; - private long connectTimeoutMillis; - private long receiveTimeoutMillis; - private boolean reuseAddress; + private long connectTimeout = 10000; + private long timeout = 30000; + private boolean reuseAddress = true; private boolean sync = true; private String passphrase; private File keyStoreFile; @@ -51,30 +52,31 @@ public class NettyConfiguration { private List<ChannelUpstreamHandler> decoders = new ArrayList<ChannelUpstreamHandler>(); private ChannelHandler handler; private boolean ssl; - private long sendBufferSize; - private long receiveBufferSize; - private int corePoolSize; - private int maxPoolSize; + private long sendBufferSize = 65536; + private long receiveBufferSize = 65536; + private int corePoolSize = 10; + private int maxPoolSize = 100; private String keyStoreFormat; private String securityProvider; private boolean disconnect; private boolean lazyChannelCreation = true; private boolean transferExchange; - public NettyConfiguration() { - setKeepAlive(true); - setTcpNoDelay(true); - setBroadcast(false); - setReuseAddress(true); - setSync(true); - setConnectTimeoutMillis(10000); - setReceiveTimeoutMillis(10000); - setSendBufferSize(65536); - setReceiveBufferSize(65536); - setSsl(false); - setCorePoolSize(10); - setMaxPoolSize(100); - setLazyChannelCreation(true); + /** + * Returns a copy of this configuration + */ + public NettyConfiguration copy() { + try { + NettyConfiguration answer = (NettyConfiguration) clone(); + // make sure the lists is copied in its own instance + List<ChannelDownstreamHandler> encodersCopy = new ArrayList<ChannelDownstreamHandler>(encoders); + answer.setEncoders(encodersCopy); + List<ChannelUpstreamHandler> decodersCopy = new ArrayList<ChannelUpstreamHandler>(decoders); + answer.setDecoders(decodersCopy); + return answer; + } catch (CloneNotSupportedException e) { + throw new RuntimeCamelException(e); + } } public void parseURI(URI uri, Map<String, Object> parameters, NettyComponent component) throws Exception { @@ -120,13 +122,13 @@ public class NettyConfiguration { setReuseAddress(Boolean.valueOf((String) settings.get("reuseAddress"))); } if (settings.containsKey("connectTimeoutMillis")) { - setConnectTimeoutMillis(Long.valueOf((String) settings.get("connectTimeoutMillis"))); + setConnectTimeout(Long.valueOf((String) settings.get("connectTimeoutMillis"))); } if (settings.containsKey("sync")) { setTcpNoDelay(Boolean.valueOf((String) settings.get("sync"))); } if (settings.containsKey("receiveTimeoutMillis")) { - setReceiveTimeoutMillis(Long.valueOf((String) settings.get("receiveTimeoutMillis"))); + setTimeout(Long.valueOf((String) settings.get("receiveTimeoutMillis"))); } if (settings.containsKey("sendBufferSize")) { setSendBufferSize(Long.valueOf((String) settings.get("sendBufferSize"))); @@ -202,12 +204,12 @@ public class NettyConfiguration { this.broadcast = broadcast; } - public long getConnectTimeoutMillis() { - return connectTimeoutMillis; + public long getConnectTimeout() { + return connectTimeout; } - public void setConnectTimeoutMillis(long connectTimeoutMillis) { - this.connectTimeoutMillis = connectTimeoutMillis; + public void setConnectTimeout(long connectTimeout) { + this.connectTimeout = connectTimeout; } public boolean isReuseAddress() { @@ -278,12 +280,12 @@ public class NettyConfiguration { this.handler = handler; } - public long getReceiveTimeoutMillis() { - return receiveTimeoutMillis; + public long getTimeout() { + return timeout; } - public void setReceiveTimeoutMillis(long receiveTimeoutMillis) { - this.receiveTimeoutMillis = receiveTimeoutMillis; + public void setTimeout(long timeout) { + this.timeout = timeout; } public long getSendBufferSize() { Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java?rev=941765&r1=941764&r2=941765&view=diff ============================================================================== --- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java (original) +++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java Thu May 6 15:01:34 2010 @@ -144,7 +144,7 @@ public class NettyConsumer extends Defau serverBootstrap.setOption("child.keepAlive", configuration.isKeepAlive()); serverBootstrap.setOption("child.tcpNoDelay", configuration.isTcpNoDelay()); serverBootstrap.setOption("child.reuseAddress", configuration.isReuseAddress()); - serverBootstrap.setOption("child.connectTimeoutMillis", configuration.getConnectTimeoutMillis()); + serverBootstrap.setOption("child.connectTimeoutMillis", configuration.getConnectTimeout()); channel = serverBootstrap.bind(new InetSocketAddress(configuration.getHost(), configuration.getPort())); // to keep track of all channels in use @@ -161,7 +161,7 @@ public class NettyConsumer extends Defau connectionlessServerBootstrap.setOption("child.keepAlive", configuration.isKeepAlive()); connectionlessServerBootstrap.setOption("child.tcpNoDelay", configuration.isTcpNoDelay()); connectionlessServerBootstrap.setOption("child.reuseAddress", configuration.isReuseAddress()); - connectionlessServerBootstrap.setOption("child.connectTimeoutMillis", configuration.getConnectTimeoutMillis()); + connectionlessServerBootstrap.setOption("child.connectTimeoutMillis", configuration.getConnectTimeout()); connectionlessServerBootstrap.setOption("child.broadcast", configuration.isBroadcast()); connectionlessServerBootstrap.setOption("sendBufferSize", configuration.getSendBufferSize()); connectionlessServerBootstrap.setOption("receiveBufferSize", configuration.getReceiveBufferSize()); Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java?rev=941765&r1=941764&r2=941765&view=diff ============================================================================== --- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java (original) +++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java Thu May 6 15:01:34 2010 @@ -134,9 +134,9 @@ public class NettyProducer extends Defau NettyHelper.writeBody(channel, null, body, exchange); if (configuration.isSync()) { - boolean success = countdownLatch.await(configuration.getReceiveTimeoutMillis(), TimeUnit.MILLISECONDS); + boolean success = countdownLatch.await(configuration.getTimeout(), TimeUnit.MILLISECONDS); if (!success) { - throw new ExchangeTimedOutException(exchange, configuration.getReceiveTimeoutMillis()); + throw new ExchangeTimedOutException(exchange, configuration.getTimeout()); } ClientChannelHandler handler = (ClientChannelHandler) clientPipeline.get("handler"); @@ -189,7 +189,7 @@ public class NettyProducer extends Defau clientBootstrap.setOption("child.keepAlive", configuration.isKeepAlive()); clientBootstrap.setOption("child.tcpNoDelay", configuration.isTcpNoDelay()); clientBootstrap.setOption("child.reuseAddress", configuration.isReuseAddress()); - clientBootstrap.setOption("child.connectTimeoutMillis", configuration.getConnectTimeoutMillis()); + clientBootstrap.setOption("child.connectTimeoutMillis", configuration.getConnectTimeout()); } if (clientPipelineFactory == null) { clientPipelineFactory = new ClientPipelineFactory(this); @@ -209,7 +209,7 @@ public class NettyProducer extends Defau connectionlessClientBootstrap.setOption("child.keepAlive", configuration.isKeepAlive()); connectionlessClientBootstrap.setOption("child.tcpNoDelay", configuration.isTcpNoDelay()); connectionlessClientBootstrap.setOption("child.reuseAddress", configuration.isReuseAddress()); - connectionlessClientBootstrap.setOption("child.connectTimeoutMillis", configuration.getConnectTimeoutMillis()); + connectionlessClientBootstrap.setOption("child.connectTimeoutMillis", configuration.getConnectTimeout()); connectionlessClientBootstrap.setOption("child.broadcast", configuration.isBroadcast()); connectionlessClientBootstrap.setOption("sendBufferSize", configuration.getSendBufferSize()); connectionlessClientBootstrap.setOption("receiveBufferSize", configuration.getReceiveBufferSize()); Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java?rev=941765&r1=941764&r2=941765&view=diff ============================================================================== --- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java (original) +++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java Thu May 6 15:01:34 2010 @@ -50,15 +50,16 @@ public class ServerChannelHandler extend } @Override + public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { + LOG.debug("Channel closed: " + e.getChannel()); + } + + @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent exceptionEvent) throws Exception { - if (LOG.isDebugEnabled()) { - LOG.debug("Closing channel as an exception was thrown from Netty", exceptionEvent.getCause()); - } + LOG.warn("Closing channel as an exception was thrown from Netty", exceptionEvent.getCause()); + // close channel in case an exception was thrown NettyHelper.close(exceptionEvent.getChannel()); - - // must wrap and rethrow since cause can be of Throwable and we must only throw Exception - throw new CamelException(exceptionEvent.getCause()); } @Override Added: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyComponentWithConfigurationTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyComponentWithConfigurationTest.java?rev=941765&view=auto ============================================================================== --- camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyComponentWithConfigurationTest.java (added) +++ camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyComponentWithConfigurationTest.java Thu May 6 15:01:34 2010 @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.netty; + +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +/** + * @version $Revision$ + */ +public class NettyComponentWithConfigurationTest extends CamelTestSupport { + + @Test + public void testMinaComponentWithConfiguration() throws Exception { + NettyComponent comp = context.getComponent("netty", NettyComponent.class); + + NettyConfiguration cfg = new NettyConfiguration(); + cfg.setTimeout(15000); + + comp.setConfiguration(cfg); + assertSame(cfg, comp.getConfiguration()); + + NettyEndpoint e1 = (NettyEndpoint) comp.createEndpoint("netty://tcp://localhost:4455"); + NettyEndpoint e2 = (NettyEndpoint) comp.createEndpoint("netty://tcp://localhost:5566?sync=false"); + + // should not be same + assertNotSame(e1, e2); + assertNotSame(e1.getConfiguration(), e2.getConfiguration()); + + e2.getConfiguration().setPort(5566); + + assertEquals(true, e1.getConfiguration().isSync()); + assertEquals(false, e2.getConfiguration().isSync()); + assertEquals(15000, e1.getConfiguration().getTimeout()); + assertEquals(15000, e2.getConfiguration().getTimeout()); + assertEquals(4455, e1.getConfiguration().getPort()); + assertEquals(5566, e2.getConfiguration().getPort()); + } + +} Propchange: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyComponentWithConfigurationTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyComponentWithConfigurationTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Added: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyExchangeTimeoutTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyExchangeTimeoutTest.java?rev=941765&view=auto ============================================================================== --- camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyExchangeTimeoutTest.java (added) +++ camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyExchangeTimeoutTest.java Thu May 6 15:01:34 2010 @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.netty; + +import org.apache.camel.Endpoint; +import org.apache.camel.Exchange; +import org.apache.camel.ExchangeTimedOutException; +import org.apache.camel.Processor; +import org.apache.camel.Producer; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +/** + * @version $Revision$ + */ +public class NettyExchangeTimeoutTest extends CamelTestSupport { + + private static final int PORT = 6336; + protected String uri = "netty:tcp://localhost:" + PORT; + + @Test + public void testUsingTimeoutParameter() throws Exception { + // use a timeout value of 2 seconds (timeout is in millis) so we should actually get a response in this test + Endpoint endpoint = this.context.getEndpoint("netty:tcp://localhost:" + PORT + "?timeout=2000"); + Producer producer = endpoint.createProducer(); + producer.start(); + Exchange exchange = producer.createExchange(); + exchange.getIn().setBody("Hello World"); + try { + producer.process(exchange); + fail("Should have thrown an ExchangeTimedOutException wrapped in a RuntimeCamelException"); + } catch (Exception e) { + assertTrue("Should have thrown an ExchangeTimedOutException", e instanceof ExchangeTimedOutException); + } + producer.stop(); + } + + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + public void configure() { + from(uri).process(new Processor() { + public void process(Exchange e) throws Exception { + assertEquals("Hello World", e.getIn().getBody(String.class)); + // MinaProducer has a default timeout of 30 seconds so we just wait 5 seconds + // (template.requestBody is a MinaProducer behind the doors) + Thread.sleep(5000); + + e.getOut().setBody("Okay I will be faster in the future"); + } + }); + } + }; + } + +} Propchange: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyExchangeTimeoutTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyExchangeTimeoutTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date