CAMEL-7500: netty producer would in case of redelivery cause x2 tasks to attempt redelivery. Thanks to Bob Browning for reporting and test case.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ba737e77 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ba737e77 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ba737e77 Branch: refs/heads/camel-2.15.x Commit: ba737e77eb1c4abf511c66f1ee0914490f98388c Parents: b2e8d46 Author: Claus Ibsen <davscl...@apache.org> Authored: Fri Jul 10 10:54:39 2015 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Fri Jul 10 10:55:12 2015 +0200 ---------------------------------------------------------------------- .../camel/component/netty/NettyProducer.java | 4 +- .../component/netty/NettyRedeliveryTest.java | 219 +++++++++++++++++++ .../netty/NettyUdpConnectedSendTest.java | 8 +- 3 files changed, 226 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/ba737e77/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java index ac1ecef..bf72284 100644 --- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java +++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java @@ -264,9 +264,7 @@ public class NettyProducer extends DefaultAsyncProducer { public void operationComplete(ChannelFuture channelFuture) throws Exception { LOG.trace("Operation complete {}", channelFuture); if (!channelFuture.isSuccess()) { - // no success the set the caused exception and signal callback and break - exchange.setException(channelFuture.getCause()); - producerCallback.done(false); + // no success then exit, (any exception has been handled by ClientChannelHandler#exceptionCaught) return; } http://git-wip-us.apache.org/repos/asf/camel/blob/ba737e77/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyRedeliveryTest.java ---------------------------------------------------------------------- diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyRedeliveryTest.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyRedeliveryTest.java new file mode 100644 index 0000000..af7ff76 --- /dev/null +++ b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyRedeliveryTest.java @@ -0,0 +1,219 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.netty; + +import java.io.IOException; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.SocketTimeoutException; +import java.util.Deque; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.ScheduledExecutorService; + +import org.apache.camel.CamelContext; +import org.apache.camel.EndpointInject; +import org.apache.camel.LoggingLevel; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +/** + * Test the effect of redelivery in association with netty component. + */ +public class NettyRedeliveryTest extends CamelTestSupport { + + /** + * Body of sufficient size such that it doesn't fit into the TCP buffer and has to be read. + */ + private static final byte[] LARGE_BUFFER_BODY = new byte[1000000]; + + /** + * Failure will occur with 2 redeliveries however is increasingly more likely the more it retries. + */ + private static final int REDELIVERY_COUNT = 100; + + private ExecutorService listener = Executors.newSingleThreadExecutor(); + + @EndpointInject(uri = "mock:exception") + private MockEndpoint exception; + + @EndpointInject(uri = "mock:downstream") + private MockEndpoint downstream; + + private Deque<Callable<?>> tasks = new LinkedBlockingDeque<Callable<?>>(); + private int port; + private boolean alive = true; + + @Override + protected void doPreSetup() throws Exception { + // Create a server to attempt to connect to + port = createServerSocket(0); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + onException(Exception.class) + .maximumRedeliveries(REDELIVERY_COUNT) + .retryAttemptedLogLevel(LoggingLevel.INFO) + .retriesExhaustedLogLevel(LoggingLevel.ERROR) + // lets have a little delay so we do async redelivery + .redeliveryDelay(10) + .to("mock:exception") + .handled(true); + + from("direct:start") + .routeId("start") + .to("netty:tcp://localhost:" + port) + .to("log:downstream") + .to("mock:downstream"); + } + }; + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + alive = false; + listener.shutdown(); + } + + @Test + public void testExceptionHandler() throws Exception { + /* + * We should have 0 for this as it should never be successful however it is usual that this actually returns 1. + * + * This is because two or more threads run concurrently and will setException(null) which is checked during + * redelivery to ascertain whether the delivery was successful, this leads to multiple downstream invocations being + * possible. + */ + downstream.setExpectedMessageCount(0); + downstream.setAssertPeriod(1000); + + exception.setExpectedMessageCount(1); + + sendBody("direct:start", LARGE_BUFFER_BODY); + + exception.assertIsSatisfied(); + + // given 100 retries usually yields somewhere around -95 + // assertEquals(0, context.getInflightRepository().size("start")); + + // Verify the number of tasks submitted - sometimes both callbacks add a task + assertEquals(REDELIVERY_COUNT, tasks.size()); + + // Verify the downstream completed messages - othertimes one callback gets treated as done + downstream.assertIsSatisfied(); + } + + @Override + protected CamelContext createCamelContext() throws Exception { + // Override the error handler executor service such that we can track the tasks created + CamelContext context = new DefaultCamelContext(createRegistry()) { + @Override + public ScheduledExecutorService getErrorHandlerExecutorService() { + return getScheduledExecutorService(); + } + }; + return context; + } + + private ScheduledExecutorService getScheduledExecutorService() { + final ScheduledExecutorService delegate = Executors.newScheduledThreadPool(10); + return newProxy(ScheduledExecutorService.class, new InvocationHandler() { + @Override + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + if ("submit".equals(method.getName()) || "schedule".equals(method.getName())) { + tasks.add((Callable<?>) args[0]); + } + return method.invoke(delegate, args); + } + }); + } + + private int createServerSocket(int port) throws IOException { + final ServerSocket listen = new ServerSocket(port); + listen.setSoTimeout(100); + listener.execute(new Runnable() { + + private ExecutorService pool = Executors.newCachedThreadPool(); + + @Override + public void run() { + try { + while (alive) { + try { + pool.execute(new ClosingClientRunnable(listen.accept())); + } catch (SocketTimeoutException ignored) { + // Allow the server socket to terminate in a timely fashion + } + } + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + try { + listen.close(); + } catch (IOException ignored) { + } + } + } + }); + return listen.getLocalPort(); + } + + private static <T> T newProxy(Class<T> interfaceType, InvocationHandler handler) { + Object object = Proxy.newProxyInstance(interfaceType.getClassLoader(), new Class<?>[]{interfaceType}, handler); + return interfaceType.cast(object); + } + + /** + * Handler for client connection. + */ + private class ClosingClientRunnable implements Runnable { + private final Socket socket; + + public ClosingClientRunnable(Socket socket) { + this.socket = socket; + } + + @Override + public void run() { + try { + Thread.sleep(10); + socket.close(); + } catch (Throwable e) { + throw new RuntimeException(e); + } finally { + try { + socket.close(); + } catch (IOException ignored) { + } + } + } + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/ba737e77/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUdpConnectedSendTest.java ---------------------------------------------------------------------- diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUdpConnectedSendTest.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUdpConnectedSendTest.java index bcda6eb..ddd4c0a 100644 --- a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUdpConnectedSendTest.java +++ b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUdpConnectedSendTest.java @@ -29,9 +29,13 @@ import org.jboss.netty.channel.SimpleChannelUpstreamHandler; import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory; import org.jboss.netty.handler.codec.string.StringDecoder; import org.jboss.netty.util.CharsetUtil; +import org.junit.FixMethodOrder; +import org.junit.Ignore; import org.junit.Test; +import org.junit.runners.MethodSorters; - +//We need to run the tests with fix order +@FixMethodOrder(MethodSorters.NAME_ASCENDING) public class NettyUdpConnectedSendTest extends BaseNettyTest { private static final String SEND_STRING = "***<We all love camel>***"; private static final int SEND_COUNT = 20; @@ -49,7 +53,6 @@ public class NettyUdpConnectedSendTest extends BaseNettyTest { return channelPipeline; } }); - } @@ -73,6 +76,7 @@ public class NettyUdpConnectedSendTest extends BaseNettyTest { } @Test + @Ignore("This test would be failed in JDK7 sometimes") public void sendConnectedWithoutReceiver() throws Exception { int exceptionCount = 0; for (int i = 0; i < SEND_COUNT; ++i) {