CAMEL-8204 Throw exception when the correlation id is not unique

Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/387f571c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/387f571c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/387f571c

Branch: refs/heads/camel-2.13.x
Commit: 387f571cd39d254c62ebf11e82146ecbc25559bf
Parents: 09d34bd
Author: Willem Jiang <willem.ji...@gmail.com>
Authored: Thu Jan 8 22:48:44 2015 +0800
Committer: Willem Jiang <willem.ji...@gmail.com>
Committed: Fri Jan 9 15:20:53 2015 +0800

----------------------------------------------------------------------
 .../component/jms/reply/QueueReplyManager.java  | 15 +++----
 .../jms/reply/ReplyManagerSupport.java          | 19 +++++++++
 .../jms/reply/TemporaryQueueReplyManager.java   | 12 ++----
 .../jms/JmsRequestReplyCorrelationTest.java     | 42 ++++++++++++++++++++
 4 files changed, 69 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/387f571c/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java
----------------------------------------------------------------------
diff --git 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java
 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java
index e494b83..eef52de 100644
--- 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java
+++ 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java
@@ -18,6 +18,7 @@ package org.apache.camel.component.jms.reply;
 
 import java.math.BigInteger;
 import java.util.Random;
+
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
@@ -46,17 +47,11 @@ public class QueueReplyManager extends ReplyManagerSupport {
         super(camelContext);
     }
 
-    public String registerReply(ReplyManager replyManager, Exchange exchange, 
AsyncCallback callback,
-                                String originalCorrelationId, String 
correlationId, long requestTimeout) {
-        // add to correlation map
-        QueueReplyHandler handler = new QueueReplyHandler(replyManager, 
exchange, callback,
+    protected ReplyHandler createReplyHandler(ReplyManager replyManager, 
Exchange exchange, AsyncCallback callback,
+                                              String originalCorrelationId, 
String correlationId, long requestTimeout) {
+        return new QueueReplyHandler(replyManager, exchange, callback,
                 originalCorrelationId, correlationId, requestTimeout);
-        ReplyHandler result = correlation.put(correlationId, handler, 
requestTimeout);
-        if (result != null) {
-            log.warn("The correlationId [{}] is not unique, some reply message 
would be ignored and the request thread could be blocked.", correlationId);
-        }
-        return correlationId;
-    }
+    }  
 
     public void updateCorrelationId(String correlationId, String 
newCorrelationId, long requestTimeout) {
         log.trace("Updated provisional correlationId [{}] to expected 
correlationId [{}]", correlationId, newCorrelationId);

http://git-wip-us.apache.org/repos/asf/camel/blob/387f571c/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
----------------------------------------------------------------------
diff --git 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
index a93a36b..ff2f344 100644
--- 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
+++ 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.jms.reply;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
@@ -94,6 +95,24 @@ public abstract class ReplyManagerSupport extends 
ServiceSupport implements Repl
         }
         return replyTo;
     }
+    
+    public String registerReply(ReplyManager replyManager, Exchange exchange, 
AsyncCallback callback,
+                                String originalCorrelationId, String 
correlationId, long requestTimeout) {
+        // add to correlation map
+        QueueReplyHandler handler = new QueueReplyHandler(replyManager, 
exchange, callback,
+                originalCorrelationId, correlationId, requestTimeout);
+        ReplyHandler result = correlation.put(correlationId, handler, 
requestTimeout);
+        if (result != null) {
+            String logMessage = String.format("The correlationId [%s] is not 
unique.", correlationId);
+            log.warn("{}, some reply message would be ignored and the request 
thread could be blocked.",  logMessage);
+            throw new IllegalArgumentException(logMessage);
+        }
+        return correlationId;
+    }
+    
+    
+    protected abstract ReplyHandler createReplyHandler(ReplyManager 
replyManager, Exchange exchange, AsyncCallback callback,
+                                String originalCorrelationId, String 
correlationId, long requestTimeout);
 
     public void onMessage(Message message) {
         String correlationID = null;

http://git-wip-us.apache.org/repos/asf/camel/blob/387f571c/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
----------------------------------------------------------------------
diff --git 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
index 5f77591..2108ad4 100644
--- 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
+++ 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
@@ -57,15 +57,9 @@ public class TemporaryQueueReplyManager extends 
ReplyManagerSupport {
         return super.getReplyTo();
     }
     
-    public String registerReply(ReplyManager replyManager, Exchange exchange, 
AsyncCallback callback,
-                                String originalCorrelationId, String 
correlationId, long requestTimeout) {
-        // add to correlation map
-        TemporaryQueueReplyHandler handler = new 
TemporaryQueueReplyHandler(this, exchange, callback, originalCorrelationId, 
correlationId, requestTimeout);
-        ReplyHandler result = correlation.put(correlationId, handler, 
requestTimeout);
-        if (result != null) {
-            log.error("The correlationId [{}] is not unique, some reply 
message would be ignored and the request thread could be blocked.", 
correlationId);
-        }
-        return correlationId;
+    protected ReplyHandler createReplyHandler(ReplyManager replyManager, 
Exchange exchange, AsyncCallback callback,
+                                              String originalCorrelationId, 
String correlationId, long requestTimeout) {
+        return new TemporaryQueueReplyHandler(this, exchange, callback, 
originalCorrelationId, correlationId, requestTimeout);
     }
 
     public void updateCorrelationId(String correlationId, String 
newCorrelationId, long requestTimeout) {

http://git-wip-us.apache.org/repos/asf/camel/blob/387f571c/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyCorrelationTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyCorrelationTest.java
 
b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyCorrelationTest.java
index d535f95..634e975 100644
--- 
a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyCorrelationTest.java
+++ 
b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyCorrelationTest.java
@@ -62,6 +62,41 @@ public class JmsRequestReplyCorrelationTest extends 
CamelTestSupport {
         assertEquals(REPLY_BODY, out.getOut().getBody(String.class));
         assertEquals("a", out.getOut().getHeader("JMSCorrelationID"));
     }
+    
+    /**
+     * As the correlationID should be unique when receiving the reply message, 
+     * now we just expect to get an exception here.
+     */
+    @Test
+    public void testRequestReplyCorrelationWithDuplicateId() throws Exception {
+        MockEndpoint result = getMockEndpoint("mock:result");
+        result.expectedMessageCount(1);
+
+        // just send out the request to fill the correlation id first
+        template.asyncSend("jms:queue:helloDelay", new Processor() {
+            public void process(Exchange exchange) throws Exception {
+                exchange.setPattern(ExchangePattern.InOut);
+                Message in = exchange.getIn();
+                in.setBody("Hello World");
+                in.setHeader("JMSCorrelationID", "b");
+            }
+        });
+        
+        Exchange out = template.send("jms:queue:helloDelay", 
ExchangePattern.InOut, new Processor() {
+            public void process(Exchange exchange) throws Exception {
+                Message in = exchange.getIn();
+                in.setBody("Hello World");
+                in.setHeader("JMSCorrelationID", "b");
+            }
+        });
+
+        result.assertIsSatisfied();
+
+        assertNotNull("We are expecting the exception here!", 
out.getException());
+        assertTrue("Get a wrong exception", out.getException() instanceof 
IllegalArgumentException);
+        
+    }
+
 
     /**
      * When the setting useMessageIdAsCorrelationid is false and
@@ -211,6 +246,13 @@ public class JmsRequestReplyCorrelationTest extends 
CamelTestSupport {
                         
assertNotNull(exchange.getIn().getHeader("JMSReplyTo"));
                     }
                 }).to("mock:result");
+                
+                
from("jms:queue:helloDelay").delay().constant(2000).process(new Processor() {
+                    public void process(Exchange exchange) throws Exception {
+                        exchange.getIn().setBody(REPLY_BODY);
+                        
assertNotNull(exchange.getIn().getHeader("JMSReplyTo"));
+                    }
+                }).to("mock:result");
             }
         };
     }

Reply via email to