Updated Branches: refs/heads/camel-2.11.x 3b95b6c6d -> d5e7d2567 refs/heads/camel-2.12.x 873a4812f -> 78d7d05c1
CAMEL-7049: Fixed jms JMSReplyTo header with a topic did not work correctly. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/78d7d05c Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/78d7d05c Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/78d7d05c Branch: refs/heads/camel-2.12.x Commit: 78d7d05c1b9f203d044b9268faef05221dbc9ef2 Parents: 873a481 Author: Claus Ibsen <davscl...@apache.org> Authored: Tue Jan 28 09:04:21 2014 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Tue Jan 28 09:28:49 2014 +0100 ---------------------------------------------------------------------- .../camel/component/jms/JmsMessageHelper.java | 28 ++++++++++++++++++++ .../apache/camel/component/jms/JmsProducer.java | 22 ++++++++------- 2 files changed, 41 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/78d7d05c/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageHelper.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageHelper.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageHelper.java index 52f6af5..eb125b3 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageHelper.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageHelper.java @@ -171,6 +171,34 @@ public final class JmsMessageHelper { } /** + * Whether the destination name has either queue or temp queue prefix. + * + * @param destination the destination + * @return <tt>true</tt> if queue or temp-queue prefix, <tt>false</tt> otherwise + */ + public static boolean isQueuePrefix(String destination) { + if (ObjectHelper.isEmpty(destination)) { + return false; + } + + return destination.startsWith(QUEUE_PREFIX) || destination.startsWith(TEMP_QUEUE_PREFIX); + } + + /** + * Whether the destination name has either topic or temp topic prefix. + * + * @param destination the destination + * @return <tt>true</tt> if topic or temp-topic prefix, <tt>false</tt> otherwise + */ + public static boolean isTopicPrefix(String destination) { + if (ObjectHelper.isEmpty(destination)) { + return false; + } + + return destination.startsWith(TOPIC_PREFIX) || destination.startsWith(TEMP_TOPIC_PREFIX); + } + + /** * Normalizes the destination name. * <p/> * This ensures the destination name is correct, and we do not create queues as <tt>queue://queue:foo</tt>, which http://git-wip-us.apache.org/repos/asf/camel/blob/78d7d05c/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java index 067b5bd..a30c4f3 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java @@ -44,6 +44,8 @@ import org.springframework.jms.core.JmsOperations; import org.springframework.jms.core.MessageCreator; import org.springframework.jms.support.JmsUtils; +import static org.apache.camel.component.jms.JmsMessageHelper.isQueuePrefix; +import static org.apache.camel.component.jms.JmsMessageHelper.isTopicPrefix; import static org.apache.camel.component.jms.JmsMessageHelper.normalizeDestinationName; /** @@ -332,24 +334,26 @@ public class JmsProducer extends DefaultAsyncProducer { // the reply to is a String, so we need to look up its Destination instance // and if needed create the destination using the session if needed to if (jmsReplyTo != null && jmsReplyTo instanceof String) { - // must normalize the destination name - String before = (String) jmsReplyTo; - String replyTo = normalizeDestinationName(before); + String replyTo = (String) jmsReplyTo; // we need to null it as we use the String to resolve it as a Destination instance jmsReplyTo = null; - LOG.trace("Normalized JMSReplyTo destination name {} -> {}", before, replyTo); - + boolean isPubSub = isTopicPrefix(replyTo) || (!isQueuePrefix(replyTo) && endpoint.isPubSubDomain()); // try using destination resolver to lookup the destination if (endpoint.getDestinationResolver() != null) { - jmsReplyTo = endpoint.getDestinationResolver().resolveDestinationName(session, replyTo, endpoint.isPubSubDomain()); + jmsReplyTo = endpoint.getDestinationResolver().resolveDestinationName(session, replyTo, isPubSub); if (LOG.isDebugEnabled()) { LOG.debug("Resolved JMSReplyTo destination {} using DestinationResolver {} as PubSubDomain {} -> {}", - new Object[]{replyTo, endpoint.getDestinationResolver(), endpoint.isPubSubDomain(), jmsReplyTo}); + new Object[]{replyTo, endpoint.getDestinationResolver(), isPubSub, jmsReplyTo}); } } if (jmsReplyTo == null) { - // okay then fallback and create the queue - if (endpoint.isPubSubDomain()) { + // must normalize the destination name + String before = replyTo; + replyTo = normalizeDestinationName(replyTo); + LOG.trace("Normalized JMSReplyTo destination name {} -> {}", before, replyTo); + + // okay then fallback and create the queue/topic + if (isPubSub) { LOG.debug("Creating JMSReplyTo topic: {}", replyTo); jmsReplyTo = session.createTopic(replyTo); } else {