Updated Branches: refs/heads/master b724bcce7 -> 33342ad04
CAMEL-6644: netty suspend/resume now unbinds the acceptor. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/33342ad0 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/33342ad0 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/33342ad0 Branch: refs/heads/master Commit: 33342ad04740f678913d44d361cb6532fd4b4393 Parents: b724bcc Author: Claus Ibsen <davscl...@apache.org> Authored: Fri Aug 16 15:17:35 2013 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Fri Aug 16 15:17:35 2013 +0200 ---------------------------------------------------------------------- .../netty/http/NettyHttpConfiguration.java | 9 +++ .../component/netty/http/NettyHttpConsumer.java | 22 ++++++- .../http/NettyHttpSuspendResume503Test.java | 69 ++++++++++++++++++++ .../netty/http/NettyHttpSuspendResumeTest.java | 5 +- .../camel/component/netty/NettyConsumer.java | 10 +++ .../netty/NettyServerBootstrapFactory.java | 4 +- .../SingleTCPNettyServerBootstrapFactory.java | 25 +++++++ .../SingleUDPNettyServerBootstrapFactory.java | 23 +++++-- .../component/netty/NettySuspendResumeTest.java | 61 +++++++++++++++++ 9 files changed, 216 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/33342ad0/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpConfiguration.java b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpConfiguration.java index 94500e9..509ea02 100644 --- a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpConfiguration.java +++ b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpConfiguration.java @@ -37,6 +37,7 @@ public class NettyHttpConfiguration extends NettyConfiguration { private boolean bridgeEndpoint; private String path; private boolean disableStreamCache; + private boolean send503whenSuspended = true; public NettyHttpConfiguration() { // we need sync=true as http is request/reply by nature @@ -133,4 +134,12 @@ public class NettyHttpConfiguration extends NettyConfiguration { public void setDisableStreamCache(boolean disableStreamCache) { this.disableStreamCache = disableStreamCache; } + + public boolean isSend503whenSuspended() { + return send503whenSuspended; + } + + public void setSend503whenSuspended(boolean send503whenSuspended) { + this.send503whenSuspended = send503whenSuspended; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/33342ad0/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpConsumer.java b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpConsumer.java index 4943ac3..02486c4 100644 --- a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpConsumer.java +++ b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpConsumer.java @@ -45,8 +45,6 @@ public class NettyHttpConsumer extends NettyConsumer { super.doStart(); ObjectHelper.notNull(getNettyServerBootstrapFactory(), "HttpServerBootstrapFactory", this); getNettyServerBootstrapFactory().addConsumer(this); - - } @Override @@ -54,4 +52,24 @@ public class NettyHttpConsumer extends NettyConsumer { getNettyServerBootstrapFactory().removeConsumer(this); super.doStop(); } + + @Override + protected void doSuspend() throws Exception { + if (getConfiguration().isSend503whenSuspended()) { + // noop as the server handler will send back 503 when suspended + } else { + // will unbind the acceptor + super.doSuspend(); + } + } + + @Override + protected void doResume() throws Exception { + if (getConfiguration().isSend503whenSuspended()) { + // noop + } else { + // will resume the acceptor + super.doResume(); + } + } } http://git-wip-us.apache.org/repos/asf/camel/blob/33342ad0/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettyHttpSuspendResume503Test.java ---------------------------------------------------------------------- diff --git a/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettyHttpSuspendResume503Test.java b/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettyHttpSuspendResume503Test.java new file mode 100644 index 0000000..373e439 --- /dev/null +++ b/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettyHttpSuspendResume503Test.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.netty.http; + +import org.apache.camel.builder.RouteBuilder; +import org.junit.Test; + +public class NettyHttpSuspendResume503Test extends BaseNettyTest { + + private String serverUri = "netty-http:http://localhost:" + getPort() + "/cool?disconnect=true"; + + @Test + public void testNettySuspendResume() throws Exception { + context.getShutdownStrategy().setTimeout(50); + + String reply = template.requestBody(serverUri, "World", String.class); + assertEquals("Bye World", reply); + + // now suspend netty + NettyHttpConsumer consumer = (NettyHttpConsumer) context.getRoute("foo").getConsumer(); + assertNotNull(consumer); + + // suspend + consumer.suspend(); + + try { + template.requestBody(serverUri, "Moon", String.class); + fail("Should throw exception"); + } catch (Exception e) { + NettyHttpOperationFailedException cause = assertIsInstanceOf(NettyHttpOperationFailedException.class, e.getCause()); + assertEquals(503, cause.getStatusCode()); + } + + // resume + consumer.resume(); + + Thread.sleep(2000); + + // and send request which should be processed + reply = template.requestBody(serverUri, "Moon", String.class); + assertEquals("Bye Moon", reply); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from(serverUri).routeId("foo") + .transform(body().prepend("Bye ")); + } + }; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/33342ad0/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettyHttpSuspendResumeTest.java ---------------------------------------------------------------------- diff --git a/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettyHttpSuspendResumeTest.java b/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettyHttpSuspendResumeTest.java index 5aa86d7..dbc4893 100644 --- a/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettyHttpSuspendResumeTest.java +++ b/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettyHttpSuspendResumeTest.java @@ -21,7 +21,7 @@ import org.junit.Test; public class NettyHttpSuspendResumeTest extends BaseNettyTest { - private String serverUri = "netty-http:http://localhost:" + getPort() + "/cool"; + private String serverUri = "netty-http:http://localhost:" + getPort() + "/cool?disconnect=true&send503whenSuspended=false"; @Test public void testNettySuspendResume() throws Exception { @@ -41,8 +41,7 @@ public class NettyHttpSuspendResumeTest extends BaseNettyTest { template.requestBody(serverUri, "Moon", String.class); fail("Should throw exception"); } catch (Exception e) { - NettyHttpOperationFailedException cause = assertIsInstanceOf(NettyHttpOperationFailedException.class, e.getCause()); - assertEquals(503, cause.getStatusCode()); + assertTrue(e.getCause().getMessage().startsWith("Cannot connect to localhost")); } // resume http://git-wip-us.apache.org/repos/asf/camel/blob/33342ad0/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java index 938ef96..fa8b06a 100644 --- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java +++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java @@ -82,6 +82,16 @@ public class NettyConsumer extends DefaultConsumer { super.doStop(); } + @Override + protected void doSuspend() throws Exception { + ServiceHelper.suspendService(nettyServerBootstrapFactory); + } + + @Override + protected void doResume() throws Exception { + ServiceHelper.resumeService(nettyServerBootstrapFactory); + } + public CamelContext getContext() { return context; } http://git-wip-us.apache.org/repos/asf/camel/blob/33342ad0/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyServerBootstrapFactory.java ---------------------------------------------------------------------- diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyServerBootstrapFactory.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyServerBootstrapFactory.java index 18dfb4b..af28160 100644 --- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyServerBootstrapFactory.java +++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyServerBootstrapFactory.java @@ -19,7 +19,7 @@ package org.apache.camel.component.netty; import java.util.concurrent.ThreadFactory; import org.apache.camel.CamelContext; -import org.apache.camel.Service; +import org.apache.camel.SuspendableService; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelPipelineFactory; @@ -30,7 +30,7 @@ import org.jboss.netty.channel.ChannelPipelineFactory; * This factory allows for consumers to reuse existing {@link org.jboss.netty.bootstrap.ServerBootstrap} which * allows to share the same port for multiple consumers. */ -public interface NettyServerBootstrapFactory extends Service { +public interface NettyServerBootstrapFactory extends SuspendableService { /** * Initializes this <b>non-shared</b> {@link NettyServerBootstrapFactory}. http://git-wip-us.apache.org/repos/asf/camel/blob/33342ad0/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleTCPNettyServerBootstrapFactory.java ---------------------------------------------------------------------- diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleTCPNettyServerBootstrapFactory.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleTCPNettyServerBootstrapFactory.java index 9e06622..97f3395 100644 --- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleTCPNettyServerBootstrapFactory.java +++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleTCPNettyServerBootstrapFactory.java @@ -25,6 +25,7 @@ import org.apache.camel.support.ServiceSupport; import org.jboss.netty.bootstrap.ServerBootstrap; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelFactory; +import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelPipelineFactory; import org.jboss.netty.channel.group.ChannelGroup; import org.jboss.netty.channel.group.ChannelGroupFuture; @@ -97,6 +98,30 @@ public class SingleTCPNettyServerBootstrapFactory extends ServiceSupport impleme stopServerBootstrap(); } + @Override + protected void doResume() throws Exception { + if (channel != null) { + LOG.debug("ServerBootstrap binding to {}:{}", configuration.getHost(), configuration.getPort()); + ChannelFuture future = channel.bind(new InetSocketAddress(configuration.getHost(), configuration.getPort())); + future.awaitUninterruptibly(); + if (!future.isSuccess()) { + // if we cannot bind, the re-create channel + allChannels.remove(channel); + channel = serverBootstrap.bind(new InetSocketAddress(configuration.getHost(), configuration.getPort())); + allChannels.add(channel); + } + } + } + + @Override + protected void doSuspend() throws Exception { + if (channel != null) { + LOG.debug("ServerBootstrap unbinding from {}:{}", configuration.getHost(), configuration.getPort()); + ChannelFuture future = channel.unbind(); + future.awaitUninterruptibly(); + } + } + protected void startServerBootstrap() { // prefer using explicit configured thread pools BossPool bp = configuration.getBossPool(); http://git-wip-us.apache.org/repos/asf/camel/blob/33342ad0/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleUDPNettyServerBootstrapFactory.java ---------------------------------------------------------------------- diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleUDPNettyServerBootstrapFactory.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleUDPNettyServerBootstrapFactory.java index d73f67a..20faf1a 100644 --- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleUDPNettyServerBootstrapFactory.java +++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleUDPNettyServerBootstrapFactory.java @@ -57,6 +57,9 @@ public class SingleUDPNettyServerBootstrapFactory extends ServiceSupport impleme private ChannelPipelineFactory pipelineFactory; private DatagramChannelFactory datagramChannelFactory; private ConnectionlessBootstrap connectionlessBootstrap; + private NetworkInterface multicastNetworkInterface; + private DatagramChannel datagramChannel; + private Channel channel; private WorkerPool workerPool; public SingleUDPNettyServerBootstrapFactory() { @@ -104,6 +107,16 @@ public class SingleUDPNettyServerBootstrapFactory extends ServiceSupport impleme stopServerBootstrap(); } + @Override + protected void doResume() throws Exception { + // noop + } + + @Override + protected void doSuspend() throws Exception { + // noop + } + protected void startServerBootstrap() throws UnknownHostException, SocketException { // create non-shared worker pool int count = configuration.getWorkerCount() > 0 ? configuration.getWorkerCount() : NettyHelper.DEFAULT_IO_THREADS; @@ -145,15 +158,15 @@ public class SingleUDPNettyServerBootstrapFactory extends ServiceSupport impleme IpV4Subnet multicastSubnet = new IpV4Subnet(MULTICAST_SUBNET); if (multicastSubnet.contains(configuration.getHost())) { - DatagramChannel channel = (DatagramChannel)connectionlessBootstrap.bind(hostAddress); + datagramChannel = (DatagramChannel)connectionlessBootstrap.bind(hostAddress); String networkInterface = configuration.getNetworkInterface() == null ? LOOPBACK_INTERFACE : configuration.getNetworkInterface(); - NetworkInterface multicastNetworkInterface = NetworkInterface.getByName(networkInterface); + multicastNetworkInterface = NetworkInterface.getByName(networkInterface); LOG.info("ConnectionlessBootstrap joining {}:{} using network interface: {}", new Object[]{configuration.getHost(), configuration.getPort(), multicastNetworkInterface.getName()}); - channel.joinGroup(hostAddress, multicastNetworkInterface); - allChannels.add(channel); + datagramChannel.joinGroup(hostAddress, multicastNetworkInterface); + allChannels.add(datagramChannel); } else { LOG.info("ConnectionlessBootstrap binding to {}:{}", configuration.getHost(), configuration.getPort()); - Channel channel = connectionlessBootstrap.bind(hostAddress); + channel = connectionlessBootstrap.bind(hostAddress); allChannels.add(channel); } } http://git-wip-us.apache.org/repos/asf/camel/blob/33342ad0/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettySuspendResumeTest.java ---------------------------------------------------------------------- diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettySuspendResumeTest.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettySuspendResumeTest.java new file mode 100644 index 0000000..df66492 --- /dev/null +++ b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettySuspendResumeTest.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.netty; + +import org.apache.camel.builder.RouteBuilder; +import org.junit.Test; + +public class NettySuspendResumeTest extends BaseNettyTest { + + @Test + public void testSuspendResume() throws Exception { + getMockEndpoint("mock:result").expectedBodiesReceived("Camel", "Again"); + + String out = template.requestBody("netty:tcp://localhost:{{port}}?sync=true&disconnect=true", "Camel", String.class); + assertEquals("Bye Camel", out); + + context.suspendRoute("foo"); + + try { + template.requestBody("netty:tcp://localhost:{{port}}?sync=true&disconnect=true", "World", String.class); + fail("Should not allow connecting as its suspended"); + } catch (Exception e) { + // expected + } + + context.resumeRoute("foo"); + + out = template.requestBody("netty:tcp://localhost:{{port}}?sync=true&disconnect=true", "Again", String.class); + assertEquals("Bye Again", out); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("netty:tcp://localhost:{{port}}?sync=true").routeId("foo") + .to("log:result") + .to("mock:result") + .transform(body().prepend("Bye ")); + } + }; + } + +}