Repository: camel Updated Branches: refs/heads/camel-2.13.x 09d34bd3a -> 387f571cd refs/heads/camel-2.14.x 9bfb2092c -> 34cd058ce
CAMEL-8204 Throw exception when the correlation id is not unique Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/34cd058c Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/34cd058c Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/34cd058c Branch: refs/heads/camel-2.14.x Commit: 34cd058ced4d1b8c1e8b02bdcafd6c573aad6f2d Parents: 9bfb209 Author: Willem Jiang <willem.ji...@gmail.com> Authored: Thu Jan 8 22:48:44 2015 +0800 Committer: Willem Jiang <willem.ji...@gmail.com> Committed: Fri Jan 9 15:20:39 2015 +0800 ---------------------------------------------------------------------- .../component/jms/reply/QueueReplyManager.java | 15 +++---- .../jms/reply/ReplyManagerSupport.java | 19 +++++++++ .../jms/reply/TemporaryQueueReplyManager.java | 12 ++---- .../jms/JmsRequestReplyCorrelationTest.java | 42 ++++++++++++++++++++ 4 files changed, 69 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/34cd058c/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java index e494b83..eef52de 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java @@ -18,6 +18,7 @@ package org.apache.camel.component.jms.reply; import java.math.BigInteger; import java.util.Random; + import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; @@ -46,17 +47,11 @@ public class QueueReplyManager extends ReplyManagerSupport { super(camelContext); } - public String registerReply(ReplyManager replyManager, Exchange exchange, AsyncCallback callback, - String originalCorrelationId, String correlationId, long requestTimeout) { - // add to correlation map - QueueReplyHandler handler = new QueueReplyHandler(replyManager, exchange, callback, + protected ReplyHandler createReplyHandler(ReplyManager replyManager, Exchange exchange, AsyncCallback callback, + String originalCorrelationId, String correlationId, long requestTimeout) { + return new QueueReplyHandler(replyManager, exchange, callback, originalCorrelationId, correlationId, requestTimeout); - ReplyHandler result = correlation.put(correlationId, handler, requestTimeout); - if (result != null) { - log.warn("The correlationId [{}] is not unique, some reply message would be ignored and the request thread could be blocked.", correlationId); - } - return correlationId; - } + } public void updateCorrelationId(String correlationId, String newCorrelationId, long requestTimeout) { log.trace("Updated provisional correlationId [{}] to expected correlationId [{}]", correlationId, newCorrelationId); http://git-wip-us.apache.org/repos/asf/camel/blob/34cd058c/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java index a93a36b..ff2f344 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java @@ -19,6 +19,7 @@ package org.apache.camel.component.jms.reply; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; + import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; @@ -94,6 +95,24 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements Repl } return replyTo; } + + public String registerReply(ReplyManager replyManager, Exchange exchange, AsyncCallback callback, + String originalCorrelationId, String correlationId, long requestTimeout) { + // add to correlation map + QueueReplyHandler handler = new QueueReplyHandler(replyManager, exchange, callback, + originalCorrelationId, correlationId, requestTimeout); + ReplyHandler result = correlation.put(correlationId, handler, requestTimeout); + if (result != null) { + String logMessage = String.format("The correlationId [%s] is not unique.", correlationId); + log.warn("{}, some reply message would be ignored and the request thread could be blocked.", logMessage); + throw new IllegalArgumentException(logMessage); + } + return correlationId; + } + + + protected abstract ReplyHandler createReplyHandler(ReplyManager replyManager, Exchange exchange, AsyncCallback callback, + String originalCorrelationId, String correlationId, long requestTimeout); public void onMessage(Message message) { String correlationID = null; http://git-wip-us.apache.org/repos/asf/camel/blob/34cd058c/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java index 788994b..54915e0 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java @@ -57,15 +57,9 @@ public class TemporaryQueueReplyManager extends ReplyManagerSupport { return super.getReplyTo(); } - public String registerReply(ReplyManager replyManager, Exchange exchange, AsyncCallback callback, - String originalCorrelationId, String correlationId, long requestTimeout) { - // add to correlation map - TemporaryQueueReplyHandler handler = new TemporaryQueueReplyHandler(this, exchange, callback, originalCorrelationId, correlationId, requestTimeout); - ReplyHandler result = correlation.put(correlationId, handler, requestTimeout); - if (result != null) { - log.error("The correlationId [{}] is not unique, some reply message would be ignored and the request thread could be blocked.", correlationId); - } - return correlationId; + protected ReplyHandler createReplyHandler(ReplyManager replyManager, Exchange exchange, AsyncCallback callback, + String originalCorrelationId, String correlationId, long requestTimeout) { + return new TemporaryQueueReplyHandler(this, exchange, callback, originalCorrelationId, correlationId, requestTimeout); } public void updateCorrelationId(String correlationId, String newCorrelationId, long requestTimeout) { http://git-wip-us.apache.org/repos/asf/camel/blob/34cd058c/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyCorrelationTest.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyCorrelationTest.java b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyCorrelationTest.java index d535f95..634e975 100644 --- a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyCorrelationTest.java +++ b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyCorrelationTest.java @@ -62,6 +62,41 @@ public class JmsRequestReplyCorrelationTest extends CamelTestSupport { assertEquals(REPLY_BODY, out.getOut().getBody(String.class)); assertEquals("a", out.getOut().getHeader("JMSCorrelationID")); } + + /** + * As the correlationID should be unique when receiving the reply message, + * now we just expect to get an exception here. + */ + @Test + public void testRequestReplyCorrelationWithDuplicateId() throws Exception { + MockEndpoint result = getMockEndpoint("mock:result"); + result.expectedMessageCount(1); + + // just send out the request to fill the correlation id first + template.asyncSend("jms:queue:helloDelay", new Processor() { + public void process(Exchange exchange) throws Exception { + exchange.setPattern(ExchangePattern.InOut); + Message in = exchange.getIn(); + in.setBody("Hello World"); + in.setHeader("JMSCorrelationID", "b"); + } + }); + + Exchange out = template.send("jms:queue:helloDelay", ExchangePattern.InOut, new Processor() { + public void process(Exchange exchange) throws Exception { + Message in = exchange.getIn(); + in.setBody("Hello World"); + in.setHeader("JMSCorrelationID", "b"); + } + }); + + result.assertIsSatisfied(); + + assertNotNull("We are expecting the exception here!", out.getException()); + assertTrue("Get a wrong exception", out.getException() instanceof IllegalArgumentException); + + } + /** * When the setting useMessageIdAsCorrelationid is false and @@ -211,6 +246,13 @@ public class JmsRequestReplyCorrelationTest extends CamelTestSupport { assertNotNull(exchange.getIn().getHeader("JMSReplyTo")); } }).to("mock:result"); + + from("jms:queue:helloDelay").delay().constant(2000).process(new Processor() { + public void process(Exchange exchange) throws Exception { + exchange.getIn().setBody(REPLY_BODY); + assertNotNull(exchange.getIn().getHeader("JMSReplyTo")); + } + }).to("mock:result"); } }; }