This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 5d7dcf2a741d5d5fe4e81355559d0872127236b7 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Wed Jan 3 09:57:02 2018 +0100 CAMEL-12103: camel-netty4 consumer should not be suspendable as it should really stop take in new requests when stopping/shutting-down etc, as otherwise it keeps running if new requests comes in faster than it can process. The camel-netty4-http is suspendable by sending back HTTP status 503. Thanks to Yih Tsern for reporting and providing a reproducer. --- .../component/netty4/http/NettyHttpConsumer.java | 11 ++++---- .../ClientModeTCPNettyServerBootstrapFactory.java | 19 +------------- .../camel/component/netty4/NettyConsumer.java | 15 +---------- .../SingleTCPNettyServerBootstrapFactory.java | 29 +--------------------- .../SingleUDPNettyServerBootstrapFactory.java | 13 +--------- 5 files changed, 10 insertions(+), 77 deletions(-) diff --git a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpConsumer.java b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpConsumer.java index edfea84..e335401 100644 --- a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpConsumer.java +++ b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpConsumer.java @@ -17,6 +17,7 @@ package org.apache.camel.component.netty4.http; import org.apache.camel.Processor; +import org.apache.camel.Suspendable; import org.apache.camel.component.netty4.NettyConfiguration; import org.apache.camel.component.netty4.NettyConsumer; import org.apache.camel.util.ObjectHelper; @@ -24,7 +25,7 @@ import org.apache.camel.util.ObjectHelper; /** * HTTP based {@link NettyConsumer} */ -public class NettyHttpConsumer extends NettyConsumer { +public class NettyHttpConsumer extends NettyConsumer implements Suspendable { public NettyHttpConsumer(NettyHttpEndpoint nettyEndpoint, Processor processor, NettyConfiguration configuration) { super(nettyEndpoint, processor, configuration); @@ -58,8 +59,8 @@ public class NettyHttpConsumer extends NettyConsumer { if (getConfiguration().isSend503whenSuspended()) { // noop as the server handler will send back 503 when suspended } else { - // will unbind the acceptor - super.doSuspend(); + // will stop the acceptor + doStop(); } } @@ -68,8 +69,8 @@ public class NettyHttpConsumer extends NettyConsumer { if (getConfiguration().isSend503whenSuspended()) { // noop } else { - // will resume the acceptor - super.doResume(); + // will start the acceptor + doStart(); } } } diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ClientModeTCPNettyServerBootstrapFactory.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ClientModeTCPNettyServerBootstrapFactory.java index 653da19..204d1c1 100644 --- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ClientModeTCPNettyServerBootstrapFactory.java +++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ClientModeTCPNettyServerBootstrapFactory.java @@ -33,7 +33,6 @@ import io.netty.channel.epoll.EpollSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import org.apache.camel.CamelContext; -import org.apache.camel.Suspendable; import org.apache.camel.support.ServiceSupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,7 +40,7 @@ import org.slf4j.LoggerFactory; /** * A {@link NettyServerBootstrapFactory} which is used by a single consumer (not shared). */ -public class ClientModeTCPNettyServerBootstrapFactory extends ServiceSupport implements NettyServerBootstrapFactory, Suspendable { +public class ClientModeTCPNettyServerBootstrapFactory extends ServiceSupport implements NettyServerBootstrapFactory { protected static final Logger LOG = LoggerFactory.getLogger(ClientModeTCPNettyServerBootstrapFactory.class); @@ -97,22 +96,6 @@ public class ClientModeTCPNettyServerBootstrapFactory extends ServiceSupport imp stopServerBootstrap(); } - @Override - protected void doResume() throws Exception { - LOG.debug("ClientModeServerBootstrap connect to {}:{}", configuration.getHost(), configuration.getPort()); - ChannelFuture connectFuture = clientBootstrap.connect(new InetSocketAddress(configuration.getHost(), configuration.getPort())); - channel = openChannel(connectFuture); - } - - @Override - protected void doSuspend() throws Exception { - if (channel != null) { - LOG.debug("ClientModeServerBootstrap unbinding from {}:{}", configuration.getHost(), configuration.getPort()); - channel.close().sync(); - channel = null; - } - } - protected void startServerBootstrap() throws Exception { // prefer using explicit configured thread pools diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConsumer.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConsumer.java index fbee132..bbcdac5 100644 --- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConsumer.java +++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConsumer.java @@ -18,13 +18,12 @@ package org.apache.camel.component.netty4; import org.apache.camel.CamelContext; import org.apache.camel.Processor; -import org.apache.camel.Suspendable; import org.apache.camel.impl.DefaultConsumer; import org.apache.camel.util.ServiceHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class NettyConsumer extends DefaultConsumer implements Suspendable { +public class NettyConsumer extends DefaultConsumer { private static final Logger LOG = LoggerFactory.getLogger(NettyConsumer.class); private CamelContext context; private NettyConfiguration configuration; @@ -87,18 +86,6 @@ public class NettyConsumer extends DefaultConsumer implements Suspendable { super.doStop(); } - @Override - protected void doSuspend() throws Exception { - ServiceHelper.suspendService(nettyServerBootstrapFactory); - super.doSuspend(); - } - - @Override - protected void doResume() throws Exception { - ServiceHelper.resumeService(nettyServerBootstrapFactory); - super.doResume(); - } - public CamelContext getContext() { return context; } diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleTCPNettyServerBootstrapFactory.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleTCPNettyServerBootstrapFactory.java index 887113f..27ea50c 100644 --- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleTCPNettyServerBootstrapFactory.java +++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleTCPNettyServerBootstrapFactory.java @@ -33,7 +33,6 @@ import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.util.concurrent.ImmediateEventExecutor; import org.apache.camel.CamelContext; -import org.apache.camel.Suspendable; import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.CamelContextHelper; import org.apache.camel.util.EndpointHelper; @@ -43,7 +42,7 @@ import org.slf4j.LoggerFactory; /** * A {@link NettyServerBootstrapFactory} which is used by a single consumer (not shared). */ -public class SingleTCPNettyServerBootstrapFactory extends ServiceSupport implements NettyServerBootstrapFactory, Suspendable { +public class SingleTCPNettyServerBootstrapFactory extends ServiceSupport implements NettyServerBootstrapFactory { protected static final Logger LOG = LoggerFactory.getLogger(SingleTCPNettyServerBootstrapFactory.class); private ChannelGroup allChannels; @@ -108,32 +107,6 @@ public class SingleTCPNettyServerBootstrapFactory extends ServiceSupport impleme stopServerBootstrap(); } - @Override - protected void doResume() throws Exception { - if (channel != null) { - LOG.debug("ServerBootstrap binding to {}:{}", configuration.getHost(), configuration.getPort()); - ChannelFuture future = channel.bind(new InetSocketAddress(configuration.getHost(), configuration.getPort())); - future.awaitUninterruptibly(); - if (!future.isSuccess()) { - // if we cannot bind, the re-create channel - allChannels.remove(channel); - future = serverBootstrap.bind(new InetSocketAddress(configuration.getHost(), configuration.getPort())).sync(); - channel = future.channel(); - allChannels.add(channel); - } - } - } - - @Override - protected void doSuspend() throws Exception { - if (channel != null) { - LOG.debug("ServerBootstrap unbinding from {}:{}", configuration.getHost(), configuration.getPort()); - //TODO need to check if it's good way to unbinding the channel - ChannelFuture future = channel.close(); - future.awaitUninterruptibly(); - } - } - protected void startServerBootstrap() throws Exception { // prefer using explicit configured thread pools EventLoopGroup bg = configuration.getBossGroup(); diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleUDPNettyServerBootstrapFactory.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleUDPNettyServerBootstrapFactory.java index 3f68bd6..adba145 100644 --- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleUDPNettyServerBootstrapFactory.java +++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleUDPNettyServerBootstrapFactory.java @@ -35,7 +35,6 @@ import io.netty.channel.socket.DatagramChannel; import io.netty.channel.socket.nio.NioDatagramChannel; import io.netty.util.concurrent.ImmediateEventExecutor; import org.apache.camel.CamelContext; -import org.apache.camel.Suspendable; import org.apache.camel.component.netty4.util.SubnetUtils; import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.CamelContextHelper; @@ -47,7 +46,7 @@ import org.slf4j.LoggerFactory; /** * A {@link NettyServerBootstrapFactory} which is used by a single consumer (not shared). */ -public class SingleUDPNettyServerBootstrapFactory extends ServiceSupport implements NettyServerBootstrapFactory, Suspendable { +public class SingleUDPNettyServerBootstrapFactory extends ServiceSupport implements NettyServerBootstrapFactory { protected static final Logger LOG = LoggerFactory.getLogger(SingleUDPNettyServerBootstrapFactory.class); private static final String LOOPBACK_INTERFACE = "lo"; @@ -106,16 +105,6 @@ public class SingleUDPNettyServerBootstrapFactory extends ServiceSupport impleme stopServerBootstrap(); } - @Override - protected void doResume() throws Exception { - // noop - } - - @Override - protected void doSuspend() throws Exception { - // noop - } - protected void startServerBootstrap() throws Exception { // create non-shared worker pool EventLoopGroup wg = configuration.getWorkerGroup(); -- To stop receiving notification emails like this one, please contact "commits@camel.apache.org" <commits@camel.apache.org>.