Updated Branches: refs/heads/master 46b5128fc -> 799f95cbb
CAMEL-7026: camel-jms - Allow a message to control the request timeout using a header Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/799f95cb Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/799f95cb Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/799f95cb Branch: refs/heads/master Commit: 799f95cbb616a44184a327b4a59870bd5c4c399f Parents: 46b5128 Author: Claus Ibsen <davscl...@apache.org> Authored: Fri Nov 29 14:38:20 2013 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Fri Nov 29 14:39:24 2013 +0100 ---------------------------------------------------------------------- .../camel/component/jms/JmsConstants.java | 1 + .../apache/camel/component/jms/JmsProducer.java | 14 ++- .../camel/component/jms/reply/ReplyManager.java | 4 +- .../JmsInOutIndividualRequestTimeoutTest.java | 105 +++++++++++++++++++ 4 files changed, 118 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/799f95cb/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConstants.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConstants.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConstants.java index e96a817..a4f7b23 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConstants.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConstants.java @@ -25,6 +25,7 @@ public final class JmsConstants { public static final String JMS_DESTINATION_NAME = "CamelJmsDestinationName"; public static final String JMS_MESSAGE_TYPE = "CamelJmsMessageType"; public static final String JMS_DELIVERY_MODE = "CamelJmsDeliveryMode"; + public static final String JMS_REQUEST_TIMEOUT = "CamelJmsRequestTimeout"; private JmsConstants() { // utility class http://git-wip-us.apache.org/repos/asf/camel/blob/799f95cb/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java index 096df8c..067b5bd 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java @@ -182,6 +182,9 @@ public class JmsProducer extends DefaultAsyncProducer { initReplyManager(); + // the request timeout can be overruled by a header otherwise the endpoint configured value is used + final long timeout = exchange.getIn().getHeader(JmsConstants.JMS_REQUEST_TIMEOUT, endpoint.getRequestTimeout(), long.class); + // when using message id as correlation id, we need at first to use a provisional correlation id // which we then update to the real JMSMessageID when the message has been sent // this is done with the help of the MessageSentCallback @@ -189,7 +192,7 @@ public class JmsProducer extends DefaultAsyncProducer { final String provisionalCorrelationId = msgIdAsCorrId ? getUuidGenerator().generateUuid() : null; MessageSentCallback messageSentCallback = null; if (msgIdAsCorrId) { - messageSentCallback = new UseMessageIdAsCorrelationIdMessageSentCallback(replyManager, provisionalCorrelationId, endpoint.getRequestTimeout()); + messageSentCallback = new UseMessageIdAsCorrelationIdMessageSentCallback(replyManager, provisionalCorrelationId, timeout); } final String originalCorrelationId = in.getHeader("JMSCorrelationID", String.class); @@ -209,13 +212,16 @@ public class JmsProducer extends DefaultAsyncProducer { if (replyTo == null) { throw new RuntimeExchangeException("Failed to resolve replyTo destination", exchange); } - LOG.debug("Using JMSReplyTo destination: {}", replyTo); JmsMessageHelper.setJMSReplyTo(answer, replyTo); replyManager.setReplyToSelectorHeader(in, answer); String correlationId = determineCorrelationId(answer, provisionalCorrelationId); - replyManager.registerReply(replyManager, exchange, callback, originalCorrelationId, correlationId, endpoint.getRequestTimeout()); - LOG.debug("Using JMSCorrelationID: {}", correlationId); + replyManager.registerReply(replyManager, exchange, callback, originalCorrelationId, correlationId, timeout); + + if (LOG.isDebugEnabled()) { + LOG.debug("Using JMSCorrelationID: {}, JMSReplyTo destination: {}, with request timeout: {} ms.", + new Object[]{correlationId, replyTo, timeout}); + } LOG.trace("Created javax.jms.Message: {}", answer); return answer; http://git-wip-us.apache.org/repos/asf/camel/blob/799f95cb/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManager.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManager.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManager.java index 85d1d95..9eb0085 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManager.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManager.java @@ -69,7 +69,7 @@ public interface ReplyManager extends MessageListener { * @param callback the callback * @param originalCorrelationId an optional original correlation id * @param correlationId the correlation id to expect being used - * @param requestTimeout an optional timeout + * @param requestTimeout the timeout * @return the correlation id used */ String registerReply(ReplyManager replyManager, Exchange exchange, AsyncCallback callback, @@ -85,7 +85,7 @@ public interface ReplyManager extends MessageListener { * * @param correlationId the provisional correlation id * @param newCorrelationId the real correlation id - * @param requestTimeout an optional timeout + * @param requestTimeout the timeout */ void updateCorrelationId(String correlationId, String newCorrelationId, long requestTimeout); http://git-wip-us.apache.org/repos/asf/camel/blob/799f95cb/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOutIndividualRequestTimeoutTest.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOutIndividualRequestTimeoutTest.java b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOutIndividualRequestTimeoutTest.java new file mode 100644 index 0000000..366a672 --- /dev/null +++ b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOutIndividualRequestTimeoutTest.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.jms; + +import javax.jms.ConnectionFactory; + +import org.apache.camel.CamelContext; +import org.apache.camel.CamelExecutionException; +import org.apache.camel.Exchange; +import org.apache.camel.ExchangeTimedOutException; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +import static org.apache.camel.component.jms.JmsComponent.jmsComponentAutoAcknowledge; + +/** + * + */ +public class JmsInOutIndividualRequestTimeoutTest extends CamelTestSupport { + + protected String componentName = "activemq"; + + @Test + public void testOk() throws Exception { + getMockEndpoint("mock:result").expectedBodiesReceived("Bye Camel"); + + String out = template.requestBody("direct:start", "Camel", String.class); + assertEquals("Bye Camel", out); + + assertMockEndpointsSatisfied(); + } + + @Test + public void testTimeout() throws Exception { + getMockEndpoint("mock:result").expectedMessageCount(0); + + try { + template.requestBodyAndHeader("direct:start", "World", JmsConstants.JMS_REQUEST_TIMEOUT, 1500L, String.class); + fail("Should have thrown exception"); + } catch (CamelExecutionException e) { + ExchangeTimedOutException timeout = assertIsInstanceOf(ExchangeTimedOutException.class, e.getCause()); + assertEquals(1500, timeout.getTimeout()); + } + + assertMockEndpointsSatisfied(); + } + + @Test + public void testIndividualTimeout() throws Exception { + getMockEndpoint("mock:result").expectedMessageCount(1); + + String out = template.requestBodyAndHeader("direct:start", "World", JmsConstants.JMS_REQUEST_TIMEOUT, 8000L, String.class); + assertEquals("Bye World", out); + + assertMockEndpointsSatisfied(); + } + + protected CamelContext createCamelContext() throws Exception { + CamelContext camelContext = super.createCamelContext(); + + ConnectionFactory connectionFactory = CamelJmsTestHelper.createConnectionFactory(); + camelContext.addComponent(componentName, jmsComponentAutoAcknowledge(connectionFactory)); + + return camelContext; + } + + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() throws Exception { + from("direct:start") + .inOut("activemq:queue:foo?replyTo=queue:bar&requestTimeout=2000") + .to("mock:result"); + + from("activemq:queue:foo") + .process(new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + String body = exchange.getIn().getBody(String.class); + if ("World".equals(body)) { + log.debug("Sleeping for 4 sec to force a timeout"); + Thread.sleep(4000); + } + } + }).transform(body().prepend("Bye ")).to("log:reply"); + } + }; + } + +}