CAMEL-10756 Mina2 Producer "hang" until timeout if the response message could not be decoded * Adapt exception caught
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/6adb9216 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/6adb9216 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/6adb9216 Branch: refs/heads/camel-2.18.x Commit: 6adb92164a1f4ecc4b434498c0ce3b4827b26fef Parents: 11ba337 Author: Thomas Papke <thomas.pa...@icw.de> Authored: Mon Feb 6 13:32:47 2017 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Tue Feb 7 18:41:30 2017 +0100 ---------------------------------------------------------------------- .../camel/component/mina2/Mina2Producer.java | 8 ++-- .../component/mina2/Mina2CustomCodecTest.java | 49 ++++++++++++++++++++ 2 files changed, 52 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/6adb9216/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Producer.java ---------------------------------------------------------------------- diff --git a/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Producer.java b/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Producer.java index fc7b4a2..7dd1795 100644 --- a/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Producer.java +++ b/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Producer.java @@ -111,6 +111,7 @@ public class Mina2Producer extends DefaultProducer implements ServicePoolAware { return false; } + @Override public void process(Exchange exchange) throws Exception { try { doProcess(exchange); @@ -512,11 +513,8 @@ public class Mina2Producer extends DefaultProducer implements ServicePoolAware { this.messageReceived = false; this.cause = cause; if (ioSession != null) { - try { - closeSessionIfNeededAndAwaitCloseInHandler(ioSession); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } + CloseFuture closeFuture = ioSession.closeNow(); + closeFuture.awaitUninterruptibly(timeout, TimeUnit.MILLISECONDS); } } http://git-wip-us.apache.org/repos/asf/camel/blob/6adb9216/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2CustomCodecTest.java ---------------------------------------------------------------------- diff --git a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2CustomCodecTest.java b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2CustomCodecTest.java index 427a0ab..3a86190 100644 --- a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2CustomCodecTest.java +++ b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2CustomCodecTest.java @@ -16,6 +16,10 @@ */ package org.apache.camel.component.mina2; +import java.util.Optional; +import java.util.stream.Stream; + +import org.apache.camel.CamelExecutionException; import org.apache.camel.ResolveEndpointFailedException; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; @@ -48,9 +52,29 @@ public class Mina2CustomCodecTest extends BaseMina2Test { } @Test + public void testProducerFailInDecodingResponse() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(1); + + try { + template.requestBody(String.format("mina2:tcp://localhost:%1$s?sync=true&codec=#failingCodec", getPort()), "Hello World"); + fail("Expecting that decode of result fails"); + } catch (Exception e){ + assertTrue(e instanceof CamelExecutionException); + Optional<Throwable> rootCause = Stream.iterate(e, Throwable::getCause) + .filter(element -> element.getCause() == null).findFirst(); + assertTrue(rootCause.isPresent()); + assertTrue(rootCause.get() instanceof IllegalArgumentException); + assertTrue(rootCause.get().getMessage().contains("Something went wrong in decode")); + } + + } + + @Test public void testTCPEncodeUTF8InputIsString() throws Exception { final String myUri = String.format("mina2:tcp://localhost:%1$s?encoding=UTF-8&sync=false", getNextPort()); context.addRoutes(new RouteBuilder() { + @Override public void configure() { from(myUri).to("mock:result"); } @@ -78,15 +102,19 @@ public class Mina2CustomCodecTest extends BaseMina2Test { } } + @Override protected JndiRegistry createRegistry() throws Exception { JndiRegistry jndi = super.createRegistry(); jndi.bind("myCodec", new MyCodec()); + jndi.bind("failingCodec", new MyCodec(true)); return jndi; } + @Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { + @Override public void configure() throws Exception { from(String.format("mina2:tcp://localhost:%1$s?sync=true&codec=#myCodec", getPort())).transform(constant("Bye World")).to("mock:result"); } @@ -95,9 +123,22 @@ public class Mina2CustomCodecTest extends BaseMina2Test { private static class MyCodec implements ProtocolCodecFactory { + private final boolean failing; + + public MyCodec(boolean failing) { + this.failing = failing; + + } + + public MyCodec() { + this.failing = false; + } + + @Override public ProtocolEncoder getEncoder(IoSession session) throws Exception { return new ProtocolEncoder() { + @Override public void encode(IoSession ioSession, Object message, ProtocolEncoderOutput out) throws Exception { IoBuffer bb = IoBuffer.allocate(32).setAutoExpand(true); @@ -107,6 +148,7 @@ public class Mina2CustomCodecTest extends BaseMina2Test { out.write(bb); } + @Override public void dispose(IoSession ioSession) throws Exception { // do nothing } @@ -114,10 +156,17 @@ public class Mina2CustomCodecTest extends BaseMina2Test { } + @Override public ProtocolDecoder getDecoder(IoSession session) throws Exception { return new CumulativeProtocolDecoder() { + @Override protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { + if (failing){ + throw new IllegalArgumentException("Something went wrong in decode"); + } + + if (in.remaining() > 0) { byte[] buf = new byte[in.remaining()]; in.get(buf);