This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch jms-sent in repository https://gitbox.apache.org/repos/asf/camel.git
commit 9cf9a6788770049924539812188c83b4c6663e41 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Thu Jan 25 15:27:36 2024 +0100 CAMEL-20338: camel-jms - Store header with actual JMS destination name such as when sending to dynamic computed queues. --- .../org/apache/camel/component/jms/jms.json | 31 +++++---- .../org/apache/camel/component/jms/JmsBinding.java | 3 +- .../apache/camel/component/jms/JmsComponent.java | 4 +- .../apache/camel/component/jms/JmsConstants.java | 2 + .../camel/component/jms/JmsMessageHelper.java | 21 ++++++ .../apache/camel/component/jms/JmsProducer.java | 15 +++- .../component/jms/reply/ReplyManagerSupport.java | 5 ++ .../jms/JmsDestinationProducedHeaderTest.java | 79 ++++++++++++++++++++++ 8 files changed, 140 insertions(+), 20 deletions(-) diff --git a/components/camel-jms/src/generated/resources/org/apache/camel/component/jms/jms.json b/components/camel-jms/src/generated/resources/org/apache/camel/component/jms/jms.json index 4d0f143e6e3..5f610be423d 100644 --- a/components/camel-jms/src/generated/resources/org/apache/camel/component/jms/jms.json +++ b/components/camel-jms/src/generated/resources/org/apache/camel/component/jms/jms.json @@ -127,21 +127,22 @@ "headers": { "CamelJmsDestination": { "index": 0, "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "jakarta.jms.Destination", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The destination.", "constantName": "org.apache.camel.component.jms.JmsConstants#JMS_DESTINATION" }, "CamelJmsDestinationName": { "index": 1, "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The name of the queue or topic to use as destination.", "constantName": "org.apache.camel.component.jms.JmsConstants#JMS_DESTINATION_NAME" }, - "JMSXGroupID": { "index": 2, "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The JMS group ID.", "constantName": "org.apache.camel.component.jms.JmsConstants#JMS_X_GROUP_ID" }, - "JMSMessageID": { "index": 3, "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The JMS unique message ID.", "constantName": "org.apache.camel.component.jms.JmsConstants#JMS_HEADER_MESSAGE_ID" }, - "JMSCorrelationID": { "index": 4, "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The JMS correlation ID.", "constantName": "org.apache.camel.component.jms.JmsConstants#JMS_HEADER_CORRELATION_ID" }, - "JMSCorrelationIDAsBytes": { "index": 5, "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "byte[]", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The JMS correlation ID as bytes.", "constantName": "org.apache.camel.component.jms.JmsConstants#JMS_HEADER_CORRELATION_ID_AS_BYTES" }, - "JMSDeliveryMode": { "index": 6, "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "int", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The JMS delivery mode.", "constantName": "org.apache.camel.component.jms.JmsConstants#JMS_HEADER_DELIVERY_MODE" }, - "JMSDestination": { "index": 7, "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "jakarta.jms.Destination", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The JMS destination.", "constantName": "org.apache.camel.component.jms.JmsConstants#JMS_HEADER_DESTINATION" }, - "JMSExpiration": { "index": 8, "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The JMS expiration.", "constantName": "org.apache.camel.component.jms.JmsConstants#JMS_HEADER_EXPIRATION" }, - "JMSPriority": { "index": 9, "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "int", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The JMS priority (with 0 as the lowest priority and 9 as the highest).", "constantName": "org.apache.camel.component.jms.JmsConstants#JMS_HEADER_PRIORITY" }, - "JMSRedelivered": { "index": 10, "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "boolean", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Is the JMS message redelivered.", "constantName": "org.apache.camel.component.jms.JmsConstants#JMS_HEADER_REDELIVERED" }, - "JMSTimestamp": { "index": 11, "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The JMS timestamp.", "constantName": "org.apache.camel.component.jms.JmsConstants#JMS_HEADER_TIMESTAMP" }, - "JMSReplyTo": { "index": 12, "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "jakarta.jms.Destination", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The JMS reply-to destination.", "constantName": "org.apache.camel.component.jms.JmsConstants#JMS_HEADER_REPLY_TO" }, - "JMSType": { "index": 13, "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The JMS type.", "constantName": "org.apache.camel.component.jms.JmsConstants#JMS_HEADER_TYPE" }, - "JMSXUserID": { "index": 14, "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The XUser id.", "constantName": "org.apache.camel.component.jms.JmsConstants#JMS_HEADER_XUSER_ID" }, - "CamelJmsMessageType": { "index": 15, "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "org.apache.camel.component.jms.JmsMessageType", "enum": [ "Bytes", "Map", "Object", "Stream", "Text", "Blob" ], "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The message type.", "constantName": "org.apache.camel.component.jms.JmsConstants#JMS_MESSAGE_TYPE" }, - "CamelJmsRequestTimeout": { "index": 16, "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "defaultValue": "20_000", "description": "The timeout for waiting for a reply when using the InOut Exchange Pattern (in milliseconds).", "constantName": "org.apache.camel.component.jms.JmsConstants#JMS_REQUEST_TIMEOUT" } + "CamelJMSDestinationProduced": { "index": 2, "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The name of the queue or topic the message was sent to.", "constantName": "org.apache.camel.component.jms.JmsConstants#JMS_DESTINATION_NAME_PRODUCED" }, + "JMSXGroupID": { "index": 3, "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The JMS group ID.", "constantName": "org.apache.camel.component.jms.JmsConstants#JMS_X_GROUP_ID" }, + "JMSMessageID": { "index": 4, "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The JMS unique message ID.", "constantName": "org.apache.camel.component.jms.JmsConstants#JMS_HEADER_MESSAGE_ID" }, + "JMSCorrelationID": { "index": 5, "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The JMS correlation ID.", "constantName": "org.apache.camel.component.jms.JmsConstants#JMS_HEADER_CORRELATION_ID" }, + "JMSCorrelationIDAsBytes": { "index": 6, "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "byte[]", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The JMS correlation ID as bytes.", "constantName": "org.apache.camel.component.jms.JmsConstants#JMS_HEADER_CORRELATION_ID_AS_BYTES" }, + "JMSDeliveryMode": { "index": 7, "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "int", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The JMS delivery mode.", "constantName": "org.apache.camel.component.jms.JmsConstants#JMS_HEADER_DELIVERY_MODE" }, + "JMSDestination": { "index": 8, "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "jakarta.jms.Destination", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The JMS destination.", "constantName": "org.apache.camel.component.jms.JmsConstants#JMS_HEADER_DESTINATION" }, + "JMSExpiration": { "index": 9, "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The JMS expiration.", "constantName": "org.apache.camel.component.jms.JmsConstants#JMS_HEADER_EXPIRATION" }, + "JMSPriority": { "index": 10, "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "int", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The JMS priority (with 0 as the lowest priority and 9 as the highest).", "constantName": "org.apache.camel.component.jms.JmsConstants#JMS_HEADER_PRIORITY" }, + "JMSRedelivered": { "index": 11, "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "boolean", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Is the JMS message redelivered.", "constantName": "org.apache.camel.component.jms.JmsConstants#JMS_HEADER_REDELIVERED" }, + "JMSTimestamp": { "index": 12, "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The JMS timestamp.", "constantName": "org.apache.camel.component.jms.JmsConstants#JMS_HEADER_TIMESTAMP" }, + "JMSReplyTo": { "index": 13, "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "jakarta.jms.Destination", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The JMS reply-to destination.", "constantName": "org.apache.camel.component.jms.JmsConstants#JMS_HEADER_REPLY_TO" }, + "JMSType": { "index": 14, "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The JMS type.", "constantName": "org.apache.camel.component.jms.JmsConstants#JMS_HEADER_TYPE" }, + "JMSXUserID": { "index": 15, "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The XUser id.", "constantName": "org.apache.camel.component.jms.JmsConstants#JMS_HEADER_XUSER_ID" }, + "CamelJmsMessageType": { "index": 16, "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "org.apache.camel.component.jms.JmsMessageType", "enum": [ "Bytes", "Map", "Object", "Stream", "Text", "Blob" ], "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The message type.", "constantName": "org.apache.camel.component.jms.JmsConstants#JMS_MESSAGE_TYPE" }, + "CamelJmsRequestTimeout": { "index": 17, "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "defaultValue": "20_000", "description": "The timeout for waiting for a reply when using the InOut Exchange Pattern (in milliseconds).", "constantName": "org.apache.camel.component.jms.JmsConstants#JMS_REQUEST_TIMEOUT" } }, "properties": { "destinationType": { "index": 0, "kind": "path", "displayName": "Destination Type", "group": "common", "label": "", "required": false, "type": "string", "javaType": "java.lang.String", "enum": [ "queue", "topic", "temp-queue", "temp-topic" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "queue", "description": "The kind of destination to use" }, diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java index a4289e5d108..91d974abf1f 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java @@ -180,7 +180,8 @@ public class JmsBinding { try { map.put(JmsConstants.JMS_HEADER_CORRELATION_ID, JmsMessageHelper.getJMSCorrelationID(jmsMessage)); if (endpoint == null || endpoint.getComponent().isIncludeCorrelationIDAsBytes()) { - map.put(JmsConstants.JMS_HEADER_CORRELATION_ID_AS_BYTES, JmsMessageHelper.getJMSCorrelationIDAsBytes(jmsMessage)); + map.put(JmsConstants.JMS_HEADER_CORRELATION_ID_AS_BYTES, + JmsMessageHelper.getJMSCorrelationIDAsBytes(jmsMessage)); } map.put(JmsConstants.JMS_HEADER_DELIVERY_MODE, jmsMessage.getJMSDeliveryMode()); map.put(JmsConstants.JMS_HEADER_DESTINATION, jmsMessage.getJMSDestination()); diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java index 746a5da7c2d..1240848bc38 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java @@ -60,8 +60,8 @@ public class JmsComponent extends HeaderFilterStrategyComponent { @Metadata(label = "advanced", description = "To use a shared JMS configuration") private JmsConfiguration configuration; @Metadata(label = "advanced", - description = "Whether the JMS consumer should include JMSCorrelationIDAsBytes as a header on the Camel Message.", - defaultValue = "true") + description = "Whether the JMS consumer should include JMSCorrelationIDAsBytes as a header on the Camel Message.", + defaultValue = "true") private boolean includeCorrelationIDAsBytes = true; @Metadata(label = "advanced", description = "To use a custom QueueBrowseStrategy when browsing queues") private QueueBrowseStrategy queueBrowseStrategy; diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConstants.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConstants.java index 607bbf1c6a4..e23b2af3575 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConstants.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConstants.java @@ -27,6 +27,8 @@ public final class JmsConstants { public static final String JMS_DESTINATION = "CamelJmsDestination"; @Metadata(label = "producer", description = "The name of the queue or topic to use as destination.", javaType = "String") public static final String JMS_DESTINATION_NAME = "CamelJmsDestinationName"; + @Metadata(description = "The name of the queue or topic the message was sent to.", javaType = "String") + public static final String JMS_DESTINATION_NAME_PRODUCED = "CamelJMSDestinationProduced"; @Metadata(description = "The JMS group ID.", javaType = "String") public static final String JMS_X_GROUP_ID = "JMSXGroupID"; @Metadata(description = "The JMS unique message ID.", javaType = "String") diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageHelper.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageHelper.java index c82bd77b67c..1349118a623 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageHelper.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageHelper.java @@ -25,6 +25,8 @@ import jakarta.jms.DeliveryMode; import jakarta.jms.Destination; import jakarta.jms.JMSException; import jakarta.jms.Message; +import jakarta.jms.Queue; +import jakarta.jms.Topic; import org.apache.camel.Exchange; import org.apache.camel.support.ExchangeHelper; @@ -484,4 +486,23 @@ public final class JmsMessageHelper { } return null; } + + /** + * Gets the queue or topic name. + * + * @param destination the JMS destination + * @return the name, or <tt>null</tt> if not possible to get the name + */ + public static String getDestinationName(Destination destination) { + try { + if (destination instanceof Queue q) { + return q.getQueueName(); + } else if (destination instanceof Topic t) { + return t.getTopicName(); + } + } catch (JMSException e) { + // ignore + } + return null; + } } diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java index 5e5b60bf510..ca330c957aa 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java @@ -49,6 +49,7 @@ import org.springframework.jms.core.MessageCreator; import org.springframework.jms.support.JmsUtils; import static java.util.Optional.ofNullable; +import static org.apache.camel.component.jms.JmsMessageHelper.getDestinationName; import static org.apache.camel.component.jms.JmsMessageHelper.isQueuePrefix; import static org.apache.camel.component.jms.JmsMessageHelper.isTopicPrefix; import static org.apache.camel.component.jms.JmsMessageHelper.normalizeDestinationName; @@ -217,11 +218,12 @@ public class JmsProducer extends DefaultAsyncProducer { in.setHeader(correlationPropertyToUse, GENERATED_CORRELATION_ID_PREFIX + getUuidGenerator().generateUuid()); } + final String to = destinationName != null ? destinationName : getDestinationName(destination); MessageCreator messageCreator = new MessageCreator() { public Message createMessage(Session session) throws JMSException { Message answer = endpoint.getBinding().makeJmsMessage(exchange, in, session, null); - Destination replyTo = null; + Destination replyTo; String replyToOverride = configuration.getReplyToOverride(); if (replyToOverride != null) { replyTo = resolveOrCreateDestination(replyToOverride, session); @@ -254,6 +256,11 @@ public class JmsProducer extends DefaultAsyncProducer { doSend(true, destinationName, destination, messageCreator, messageSentCallback); + // record where we sent the message + if (to != null) { + exchange.getMessage().setHeader(JmsConstants.JMS_DESTINATION_NAME_PRODUCED, to); + } + // continue routing asynchronously (reply will be processed async when its received) return false; } @@ -316,7 +323,7 @@ public class JmsProducer extends DefaultAsyncProducer { // prefer to use destination over destination name destinationName = null; } - final String to = destinationName != null ? destinationName : String.valueOf(destination); + final String to = destinationName != null ? destinationName : getDestinationName(destination); MessageSentCallback messageSentCallback = getEndpoint().getConfiguration().isIncludeSentJMSMessageID() ? new InOnlyMessageSentCallback(exchange) : null; @@ -393,6 +400,10 @@ public class JmsProducer extends DefaultAsyncProducer { // after sending then set the OUT message id to the JMSMessageID so its identical setMessageId(exchange); + // record where we sent the message + if (to != null) { + exchange.getMessage().setHeader(JmsConstants.JMS_DESTINATION_NAME_PRODUCED, to); + } // we are synchronous so return true callback.done(true); diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java index dbdf3f3d95a..ef91941f669 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java @@ -169,6 +169,7 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements Repl if (holder != null && isRunAllowed()) { try { Exchange exchange = holder.getExchange(); + Object to = exchange.getIn().getHeader(JmsConstants.JMS_DESTINATION_NAME_PRODUCED); boolean timeout = holder.isTimeout(); if (timeout) { @@ -194,6 +195,10 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements Repl // to everything it may need, and can populate headers, properties, etc. accordingly (solves CAMEL-6218). exchange.setOut(response); Object body = response.getBody(); + // store where the request message was sent to, so we know that also + if (to != null) { + response.setHeader(JmsConstants.JMS_DESTINATION_NAME_PRODUCED, to); + } if (endpoint.isTransferException() && body instanceof Exception) { log.debug("Reply was an Exception. Setting the Exception on the Exchange: {}", body); diff --git a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsDestinationProducedHeaderTest.java b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsDestinationProducedHeaderTest.java new file mode 100644 index 00000000000..5889d242f2c --- /dev/null +++ b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsDestinationProducedHeaderTest.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.jms; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class JmsDestinationProducedHeaderTest extends AbstractPersistentJMSTest { + + @Test + public void testToD() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(2); + mock.message(0).header(JmsConstants.JMS_DESTINATION_NAME_PRODUCED).isEqualTo("JmsDestinationProducedHeaderTest.bar2"); + mock.message(1).header(JmsConstants.JMS_DESTINATION_NAME_PRODUCED).isEqualTo("JmsDestinationProducedHeaderTest.beer2"); + + template.sendBodyAndHeader("direct:start", "Hello bar", "where", "JmsDestinationProducedHeaderTest.bar2"); + template.sendBodyAndHeader("direct:start", "Hello beer", "where", "JmsDestinationProducedHeaderTest.beer2"); + + MockEndpoint.assertIsSatisfied(context); + + // there should only be two activemq endpoint + long count = context.getEndpoints().stream().filter(e -> e.getEndpointUri().startsWith("activemq:")).count(); + assertEquals(2, count, "There should only be 1 activemq endpoint"); + + // and the messages should be in the queues + String out = consumer.receiveBody("activemq:queue:JmsDestinationProducedHeaderTest.bar2", 2000, String.class); + assertEquals("Hello bar", out); + out = consumer.receiveBody("activemq:queue:JmsDestinationProducedHeaderTest.beer2", 2000, String.class); + assertEquals("Hello beer", out); + } + + @Test + public void testToDInOut() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(1); + mock.message(0).header(JmsConstants.JMS_DESTINATION_NAME_PRODUCED).isEqualTo("JmsDestinationProducedHeaderTest.echo"); + + String out = fluentTemplate.to("direct:start").withBody("Camel") + .withHeader("where", "JmsDestinationProducedHeaderTest.echo") + .request(String.class); + assertEquals("CamelCamel", out); + + MockEndpoint.assertIsSatisfied(context); + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + // route message dynamic using toD + from("direct:start") + .toD("activemq:queue:${header.where}") + .to("mock:result"); + + from("activemq:queue:JmsDestinationProducedHeaderTest.echo") + .setBody(simple("${body}${body}")); + } + }; + } +}