Author: raulk Date: Fri Mar 22 00:24:06 2013 New Revision: 1459600 URL: http://svn.apache.org/r1459600 Log: CAMEL-6123 Fixed camel-jms: InOut exchange can time out even if response was received
Added: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsChainedEndpointDelayTimeout.java Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java?rev=1459600&r1=1459599&r2=1459600&view=diff ============================================================================== --- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java (original) +++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java Fri Mar 22 00:24:06 2013 @@ -74,11 +74,8 @@ public class QueueReplyManager extends R } if (handler != null) { - try { - handler.onReply(correlationID, message); - } finally { - correlation.remove(correlationID); - } + correlation.remove(correlationID); + handler.onReply(correlationID, message); } else { // we could not correlate the received reply message to a matching request and therefore // we cannot continue routing the unknown message Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java?rev=1459600&r1=1459599&r2=1459600&view=diff ============================================================================== --- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java (original) +++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java Fri Mar 22 00:24:06 2013 @@ -82,11 +82,8 @@ public class TemporaryQueueReplyManager } if (handler != null) { - try { - handler.onReply(correlationID, message); - } finally { - correlation.remove(correlationID); - } + correlation.remove(correlationID); + handler.onReply(correlationID, message); } else { // we could not correlate the received reply message to a matching request and therefore // we cannot continue routing the unknown message Added: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsChainedEndpointDelayTimeout.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsChainedEndpointDelayTimeout.java?rev=1459600&view=auto ============================================================================== --- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsChainedEndpointDelayTimeout.java (added) +++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsChainedEndpointDelayTimeout.java Fri Mar 22 00:24:06 2013 @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.jms.issues; + +import javax.jms.ConnectionFactory; + +import org.apache.camel.CamelContext; +import org.apache.camel.ExchangeTimedOutException; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.jms.CamelJmsTestHelper; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +import static org.apache.camel.component.jms.JmsComponent.jmsComponentAutoAcknowledge; + +/** + * Unit test to assert that timeouts don't trigger twice when JMS InOut endpoints are chained, and the second endpoint + * takes longer to respond than the timeout set for the first endpoint. + */ +public class JmsChainedEndpointDelayTimeout extends CamelTestSupport { + + @Test + public void testTimeoutNotTriggeredTempQueue() throws Exception { + getMockEndpoint("mock:exception").expectedMessageCount(0); + getMockEndpoint("mock:ping").expectedMessageCount(1); + template.requestBody("activemq:test", "<hello />"); + assertMockEndpointsSatisfied(); + } + + @Test + public void testTimeoutNotTriggeredFixedQueue() throws Exception { + getMockEndpoint("mock:exception").expectedMessageCount(0); + getMockEndpoint("mock:ping").expectedMessageCount(1); + template.requestBody("activemq:testReplyFixedQueue", "<hello />"); + assertMockEndpointsSatisfied(); + } + + protected CamelContext createCamelContext() throws Exception { + CamelContext camelContext = super.createCamelContext(); + ConnectionFactory connectionFactory = CamelJmsTestHelper.createConnectionFactory(); + camelContext.addComponent("activemq", jmsComponentAutoAcknowledge(connectionFactory)); + return camelContext; + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + + onException(ExchangeTimedOutException.class) + .handled(true) + .to("mock:exception"); + + from("activemq:test") + .inOut("activemq:ping?requestTimeout=500") + .delay(constant(1000)); + + from("activemq:testReplyFixedQueue") + .inOut("activemq:ping?requestTimeout=500&replyToType=Exclusive&replyTo=reply") + .delay(constant(1000)); + + from("activemq:ping") + .to("mock:ping") + .log("pong"); + + } + }; + } +}