Author: davsclaus Date: Thu Jan 20 13:43:38 2011 New Revision: 1061298 URL: http://svn.apache.org/viewvc?rev=1061298&view=rev Log: CAMEL-3462: JmsProducer in InOnly mode should only send JMSReplyTo if preserverMessageQos=true is configured on endpoint.
Added: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOnlyWithReplyToNotPreservedTest.java - copied, changed from r1061132, camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOnlyWithReplyToTest.java Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOnlyWithReplyToTest.java Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java?rev=1061298&r1=1061297&r2=1061298&view=diff ============================================================================== --- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java (original) +++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java Thu Jan 20 13:43:38 2011 @@ -216,26 +216,18 @@ public class JmsProducer extends Default // prefer to use destination over destination name destinationName = null; } - - // we must honor these special flags to preserve QoS - if (!endpoint.isPreserveMessageQos() && !endpoint.isExplicitQosEnabled()) { - Object replyTo = exchange.getIn().getHeader("JMSReplyTo"); - if (replyTo != null) { - // we are routing an existing JmsMessage, origin from another JMS endpoint - // then we need to remove the existing JMSReplyTo - // as we are not OUT capable and thus do not expect a reply, and therefore - // the consumer of this message should not return a reply - String to = destinationName != null ? destinationName : "" + destination; - LOG.warn("Disabling JMSReplyTo as this Exchange is not OUT capable with JMSReplyTo: " + replyTo - + " for destination: " + to + ". Use preserveMessageQos=true to force Camel to keep the JMSReplyTo on: " + exchange); - exchange.getIn().setHeader("JMSReplyTo", null); - } - } + final String to = destinationName != null ? destinationName : "" + destination; MessageCreator messageCreator = new MessageCreator() { public Message createMessage(Session session) throws JMSException { Message answer = endpoint.getBinding().makeJmsMessage(exchange, in, session, null); + // when in InOnly mode the JMSReplyTo is a bit complicated + // we only want to set the JMSReplyTo on the answer if + // there is a JMSReplyTo from the header/endpoint and + // we have been told to preserveMessageQos + + Object jmsReplyTo = answer.getJMSReplyTo(); if (endpoint.isDisableReplyTo()) { // honor disable reply to configuration if (LOG.isDebugEnabled()) { @@ -244,46 +236,68 @@ public class JmsProducer extends Default answer.setJMSReplyTo(null); } else { // if the binding did not create the reply to then we have to try to create it here - if (answer.getJMSReplyTo() == null) { + if (jmsReplyTo == null) { // prefer reply to from header over endpoint configured - String replyTo = exchange.getIn().getHeader("JMSReplyTo", String.class); - if (replyTo == null) { - replyTo = endpoint.getReplyTo(); + jmsReplyTo = exchange.getIn().getHeader("JMSReplyTo", String.class); + if (jmsReplyTo == null) { + jmsReplyTo = endpoint.getReplyTo(); } + } + } - if (replyTo != null) { - // must normalize the destination name - replyTo = normalizeDestinationName(replyTo); - - Destination destination = null; - // try using destination resolver to lookup the destination - if (endpoint.getDestinationResolver() != null) { - destination = endpoint.getDestinationResolver().resolveDestinationName(session, replyTo, endpoint.isPubSubDomain()); - } - if (destination == null) { - // okay then fallback and create the queue - if (endpoint.isPubSubDomain()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Creating JMSReplyTo topic: " + replyTo); - } - destination = session.createTopic(replyTo); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Creating JMSReplyTo queue: " + replyTo); - } - destination = session.createQueue(replyTo); - } + // we must honor these special flags to preserve QoS + // as we are not OUT capable and thus do not expect a reply, and therefore + // the consumer of this message should not return a reply so we remove it + // unless we use preserveMessageQos=true to tell that we still want to use JMSReplyTo + if (jmsReplyTo != null && !(endpoint.isPreserveMessageQos() || endpoint.isExplicitQosEnabled())) { + LOG.warn("Disabling JMSReplyTo: " + jmsReplyTo + " for destination: " + to + + ". Use preserveMessageQos=true to force Camel to keep the JMSReplyTo on endpoint: " + endpoint); + jmsReplyTo = null; + } + + // the reply to is a String, so we need to look up its Destination instance + // and if needed create the destination using the session if needed to + if (jmsReplyTo != null && jmsReplyTo instanceof String) { + // must normalize the destination name + String replyTo = normalizeDestinationName((String) jmsReplyTo); + // we need to null it as we use the String to resolve it as a Destination instance + jmsReplyTo = null; + + // try using destination resolver to lookup the destination + if (endpoint.getDestinationResolver() != null) { + jmsReplyTo = endpoint.getDestinationResolver().resolveDestinationName(session, replyTo, endpoint.isPubSubDomain()); + } + if (jmsReplyTo == null) { + // okay then fallback and create the queue + if (endpoint.isPubSubDomain()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Creating JMSReplyTo topic: " + replyTo); } - if (destination != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Using JMSReplyTo destination: " + destination); - } - answer.setJMSReplyTo(destination); + jmsReplyTo = session.createTopic(replyTo); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Creating JMSReplyTo queue: " + replyTo); } + jmsReplyTo = session.createQueue(replyTo); } } } + // set the JMSReplyTo on the answer if we are to use it + Destination replyTo = null; + if (jmsReplyTo instanceof Destination) { + replyTo = (Destination) jmsReplyTo; + } + if (replyTo != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Using JMSReplyTo destination: " + replyTo); + } + answer.setJMSReplyTo(replyTo); + } else { + // do not use JMSReplyTo + answer.setJMSReplyTo(null); + } + return answer; } }; @@ -397,4 +411,4 @@ public class JmsProducer extends Default protected void doStop() throws Exception { super.doStop(); } -} \ No newline at end of file +} Copied: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOnlyWithReplyToNotPreservedTest.java (from r1061132, camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOnlyWithReplyToTest.java) URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOnlyWithReplyToNotPreservedTest.java?p2=camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOnlyWithReplyToNotPreservedTest.java&p1=camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOnlyWithReplyToTest.java&r1=1061132&r2=1061298&rev=1061298&view=diff ============================================================================== --- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOnlyWithReplyToTest.java (original) +++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOnlyWithReplyToNotPreservedTest.java Thu Jan 20 13:43:38 2011 @@ -28,17 +28,20 @@ import static org.apache.camel.component /** * @version $Revision$ */ -public class JmsInOnlyWithReplyToTest extends CamelTestSupport { +public class JmsInOnlyWithReplyToNotPreservedTest extends CamelTestSupport { @Test public void testSendInOnlyWithReplyTo() throws Exception { getMockEndpoint("mock:foo").expectedBodiesReceived("World"); - getMockEndpoint("mock:bar").expectedBodiesReceived("Bye World"); getMockEndpoint("mock:done").expectedBodiesReceived("World"); template.sendBody("direct:start", "World"); assertMockEndpointsSatisfied(); + + // there should be no messages on the bar queue + Object msg = consumer.receiveBody("activemq:queue:bar", 1000); + assertNull("Should be no message on bar queue", msg); } protected CamelContext createCamelContext() throws Exception { @@ -60,11 +63,6 @@ public class JmsInOnlyWithReplyToTest ex from("activemq:queue:foo") .to("log:foo?showAll=true", "mock:foo") .transform(body().prepend("Bye ")); - - // we should disable reply to to avoid sending the message back to our self - // after we have consumed it - from("activemq:queue:bar?disableReplyTo=true") - .to("log:bar?showAll=true", "mock:bar"); } }; } Modified: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOnlyWithReplyToTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOnlyWithReplyToTest.java?rev=1061298&r1=1061297&r2=1061298&view=diff ============================================================================== --- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOnlyWithReplyToTest.java (original) +++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOnlyWithReplyToTest.java Thu Jan 20 13:43:38 2011 @@ -54,7 +54,7 @@ public class JmsInOnlyWithReplyToTest ex @Override public void configure() throws Exception { from("direct:start") - .to("activemq:queue:foo?replyTo=queue:bar") + .to("activemq:queue:foo?replyTo=queue:bar&preserveMessageQos=true") .to("mock:done"); from("activemq:queue:foo")