Author: davsclaus Date: Tue Oct 25 07:57:46 2011 New Revision: 1188558 URL: http://svn.apache.org/viewvc?rev=1188558&view=rev Log: CAMEL-3632: Added asyncConsumer option to camel-jms. This allows the JmsConsumer to process the exchange async to better scale.
Added: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/async/AsyncConsumerFalseTest.java camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/async/AsyncConsumerInOutTest.java camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/async/AsyncConsumerInOutTwoTest.java camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/async/AsyncConsumerTest.java (with props) Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsJMSReplyToConsumerEndpointUsingInOutTest.java camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/jms/JmsJettyAsyncTest.java Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java?rev=1188558&r1=1188557&r2=1188558&view=diff ============================================================================== --- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java (original) +++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java Tue Oct 25 07:57:46 2011 @@ -22,6 +22,8 @@ import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.Session; +import org.apache.camel.AsyncCallback; +import org.apache.camel.AsyncProcessor; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; import org.apache.camel.Processor; @@ -29,6 +31,8 @@ import org.apache.camel.RollbackExchange import org.apache.camel.RuntimeCamelException; import org.apache.camel.impl.LoggingExceptionHandler; import org.apache.camel.spi.ExceptionHandler; +import org.apache.camel.util.AsyncProcessorConverterHelper; +import org.apache.camel.util.AsyncProcessorHelper; import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,18 +52,18 @@ import static org.apache.camel.util.Obje public class EndpointMessageListener implements MessageListener { private static final transient Logger LOG = LoggerFactory.getLogger(EndpointMessageListener.class); private ExceptionHandler exceptionHandler; - private JmsEndpoint endpoint; - private Processor processor; + private final JmsEndpoint endpoint; + private final AsyncProcessor processor; private JmsBinding binding; private boolean eagerLoadingOfProperties; private Object replyToDestination; private JmsOperations template; private boolean disableReplyTo; + private boolean async; public EndpointMessageListener(JmsEndpoint endpoint, Processor processor) { this.endpoint = endpoint; - this.processor = processor; - endpoint.getConfiguration().configure(this); + this.processor = AsyncProcessorConverterHelper.convert(processor); } public void onMessage(final Message message) { @@ -68,7 +72,7 @@ public class EndpointMessageListener imp LOG.debug("{} consumer received JMS message: {}", endpoint, message); boolean sendReply; - RuntimeCamelException rce = null; + RuntimeCamelException rce; try { Object replyDestination = getReplyToDestination(message); // we can only send back a reply if there was a reply destination configured @@ -84,13 +88,74 @@ public class EndpointMessageListener imp LOG.debug("Received Message has JMSCorrelationID [" + correlationId + "]"); } - // process the exchange + // process the exchange either asynchronously or synchronous LOG.trace("onMessage.process START"); - try { - processor.process(exchange); - } catch (Throwable e) { - exchange.setException(e); + AsyncCallback callback = new EndpointMessageListenerAsyncCallback(message, exchange, endpoint, sendReply, replyDestination); + + // async is by default false, which mean we by default will process the exchange synchronously + // to keep backwards compatible, as well ensure this consumer will pickup messages in order + // (eg to not consume the next message before the previous has been fully processed) + // but if end user explicit configure consumerAsync=true, then we can process the message + // asynchronously (unless endpoint has been configured synchronous, or we use transaction) + boolean forceSync = endpoint.isSynchronous() || endpoint.isTransacted(); + if (forceSync || !isAsync()) { + // must process synchronous if transacted or configured to do so + LOG.trace("Processing exchange {} synchronously", exchange.getExchangeId()); + try { + processor.process(exchange); + } catch (Exception e) { + exchange.setException(e); + } finally { + callback.done(true); + } + } else { + // process asynchronous using the async routing engine + LOG.trace("Processing exchange {} asynchronously", exchange.getExchangeId()); + boolean sync = AsyncProcessorHelper.process(processor, exchange, callback); + if (!sync) { + // will be done async so return now + return; + } } + // if we failed processed the exchange from the async callback task, then grab the exception + rce = exchange.getException(RuntimeCamelException.class); + + } catch (Exception e) { + rce = wrapRuntimeCamelException(e); + } + + // an exception occurred so rethrow to trigger rollback on JMS listener + if (rce != null) { + handleException(rce); + LOG.trace("onMessage END throwing exception: {}", rce.getMessage()); + throw rce; + } + + LOG.trace("onMessage END"); + } + + /** + * Callback task that is performed when the exchange has been processed + */ + private final class EndpointMessageListenerAsyncCallback implements AsyncCallback { + + private final Message message; + private final Exchange exchange; + private final JmsEndpoint endpoint; + private final boolean sendReply; + private final Object replyDestination; + + private EndpointMessageListenerAsyncCallback(Message message, Exchange exchange, JmsEndpoint endpoint, + boolean sendReply, Object replyDestination) { + this.message = message; + this.exchange = exchange; + this.endpoint = endpoint; + this.sendReply = sendReply; + this.replyDestination = replyDestination; + } + + @Override + public void done(boolean doneSync) { LOG.trace("onMessage.process END"); // now we evaluate the processing of the exchange and determine if it was a success or failure @@ -100,6 +165,7 @@ public class EndpointMessageListener imp // if we send back a reply it can either be the message body or transferring a caused exception org.apache.camel.Message body = null; Exception cause = null; + RuntimeCamelException rce = null; if (exchange.isFailed() || exchange.isRollbackOnly()) { if (exchange.isRollbackOnly()) { @@ -140,18 +206,18 @@ public class EndpointMessageListener imp LOG.trace("onMessage.sendReply END"); } - } catch (Exception e) { - rce = wrapRuntimeCamelException(e); - } - - // an exception occurred so rethrow to trigger rollback on JMS listener - if (rce != null) { - handleException(rce); - LOG.trace("onMessage END throwing exception: {}", rce.getMessage()); - throw rce; + // if an exception occurred + if (rce != null) { + if (doneSync) { + // we were done sync, so put exception on exchange, so we can grab it in the onMessage + // method and rethrow it + exchange.setException(rce); + } else { + // we were done async, so use the Camel built in exception handler to deal with it + handleException(rce); + } + } } - - LOG.trace("onMessage END"); } public Exchange createExchange(Message message, Object replyDestination) { @@ -245,6 +311,20 @@ public class EndpointMessageListener imp this.replyToDestination = replyToDestination; } + public boolean isAsync() { + return async; + } + + /** + * Sets whether asynchronous routing is enabled. + * <p/> + * By default this is <tt>false</tt>. If configured as <tt>true</tt> then + * this listener will process the {@link org.apache.camel.Exchange} asynchronous. + */ + public void setAsync(boolean async) { + this.async = async; + } + // Implementation methods //------------------------------------------------------------------------- @@ -327,4 +407,8 @@ public class EndpointMessageListener imp getExceptionHandler().handleException(t); } + @Override + public String toString() { + return "EndpointMessageListener[" + endpoint + "]"; + } } Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java?rev=1188558&r1=1188557&r2=1188558&view=diff ============================================================================== --- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java (original) +++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java Tue Oct 25 07:57:46 2011 @@ -348,6 +348,14 @@ public class JmsComponent extends Defaul getConfiguration().setPreserveMessageQos(preserveMessageQos); } + public void setAsyncConsumer(boolean asyncConsumer) { + configuration.setAsyncConsumer(asyncConsumer); + } + + public boolean isAsyncConsumer() { + return configuration.isAsyncConsumer(); + } + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java?rev=1188558&r1=1188557&r2=1188558&view=diff ============================================================================== --- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java (original) +++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java Tue Oct 25 07:57:46 2011 @@ -125,6 +125,7 @@ public class JmsConfiguration implements // to force disabling time to live (works in both in-only or in-out mode) private boolean disableTimeToLive; private ReplyToType replyToType; + private boolean asyncConsumer; public JmsConfiguration() { } @@ -971,7 +972,7 @@ public class JmsConfiguration implements } } - public void configure(EndpointMessageListener listener) { + public void configureMessageListener(EndpointMessageListener listener) { if (isDisableReplyTo()) { listener.setDisableReplyTo(true); } @@ -1192,4 +1193,19 @@ public class JmsConfiguration implements public void setReplyToType(ReplyToType replyToType) { this.replyToType = replyToType; } + + public boolean isAsyncConsumer() { + return asyncConsumer; + } + + /** + * Sets whether asynchronous routing is enabled on {@link JmsConsumer}. + * <p/> + * By default this is <tt>false</tt>. If configured as <tt>true</tt> then + * the {@link JmsConsumer} will process the {@link org.apache.camel.Exchange} asynchronous. + */ + public void setAsyncConsumer(boolean asyncConsumer) { + this.asyncConsumer = asyncConsumer; + } + } Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java?rev=1188558&r1=1188557&r2=1188558&view=diff ============================================================================== --- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java (original) +++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java Tue Oct 25 07:57:46 2011 @@ -64,7 +64,9 @@ public class JmsConsumer extends Default protected void createMessageListener(JmsEndpoint endpoint, Processor processor) { messageListener = new EndpointMessageListener(endpoint, processor); + getEndpoint().getConfiguration().configureMessageListener(messageListener); messageListener.setBinding(endpoint.getBinding()); + messageListener.setAsync(endpoint.getConfiguration().isAsyncConsumer()); } protected void createMessageListenerContainer() throws Exception { Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java?rev=1188558&r1=1188557&r2=1188558&view=diff ============================================================================== --- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java (original) +++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java Tue Oct 25 07:57:46 2011 @@ -1014,6 +1014,16 @@ public class JmsEndpoint extends Default } @ManagedAttribute + public void setAsyncConsumer(boolean asyncConsumer) { + configuration.setAsyncConsumer(asyncConsumer); + } + + @ManagedAttribute + public boolean isAsyncConsumer() { + return configuration.isAsyncConsumer(); + } + + @ManagedAttribute public String getReplyToType() { if (configuration.getReplyToType() != null) { return configuration.getReplyToType().name(); Added: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/async/AsyncConsumerFalseTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/async/AsyncConsumerFalseTest.java?rev=1188558&view=auto ============================================================================== --- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/async/AsyncConsumerFalseTest.java (added) +++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/async/AsyncConsumerFalseTest.java Tue Oct 25 07:57:46 2011 @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.jms.async; + +import javax.jms.ConnectionFactory; + +import org.apache.camel.CamelContext; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.jms.CamelJmsTestHelper; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +import static org.apache.camel.component.jms.JmsComponent.jmsComponentAutoAcknowledge; + +/** + * + */ +public class AsyncConsumerFalseTest extends CamelTestSupport { + + @Test + public void testAsyncJmsConsumer() throws Exception { + // async is disabled (so we should receive in same order) + getMockEndpoint("mock:result").expectedBodiesReceived("Camel", "Hello World"); + + template.sendBody("activemq:queue:start", "Hello Camel"); + template.sendBody("activemq:queue:start", "Hello World"); + + assertMockEndpointsSatisfied(); + } + + protected CamelContext createCamelContext() throws Exception { + CamelContext camelContext = super.createCamelContext(); + + camelContext.addComponent("async", new MyAsyncComponent()); + + ConnectionFactory connectionFactory = CamelJmsTestHelper.createConnectionFactory(); + camelContext.addComponent("activemq", jmsComponentAutoAcknowledge(connectionFactory)); + + return camelContext; + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + // disable async in only mode on the consumer + from("activemq:queue:start?asyncConsumer=false") + .choice() + .when(body().contains("Camel")) + .to("async:camel?delay=2000") + .to("mock:result") + .otherwise() + .to("mock:result"); + } + }; + } +} Added: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/async/AsyncConsumerInOutTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/async/AsyncConsumerInOutTest.java?rev=1188558&view=auto ============================================================================== --- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/async/AsyncConsumerInOutTest.java (added) +++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/async/AsyncConsumerInOutTest.java Tue Oct 25 07:57:46 2011 @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.jms.async; + +import javax.jms.ConnectionFactory; + +import org.apache.camel.CamelContext; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.jms.CamelJmsTestHelper; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +import static org.apache.camel.component.jms.JmsComponent.jmsComponentAutoAcknowledge; + +/** + * + */ +public class AsyncConsumerInOutTest extends CamelTestSupport { + + @Test + public void testAsyncJmsConsumer() throws Exception { + // Hello World is received first despite its send last + // the reason is that the first message is processed asynchronously + // and it takes 2 sec to complete, so in between we have time to + // process the 2nd message on the queue + getMockEndpoint("mock:result").expectedBodiesReceived("Hello World", "Bye Camel"); + + template.sendBody("activemq:queue:start", "Hello Camel"); + template.sendBody("activemq:queue:start", "Hello World"); + + assertMockEndpointsSatisfied(); + } + + protected CamelContext createCamelContext() throws Exception { + CamelContext camelContext = super.createCamelContext(); + + camelContext.addComponent("async", new MyAsyncComponent()); + + ConnectionFactory connectionFactory = CamelJmsTestHelper.createConnectionFactory(); + camelContext.addComponent("activemq", jmsComponentAutoAcknowledge(connectionFactory)); + + return camelContext; + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + // enable async in only mode on the consumer + from("activemq:queue:start?asyncConsumer=true") + .choice() + .when(body().contains("Camel")) + .to("async:camel?delay=2000") + .inOut("activemq:queue:camel") + .to("mock:result") + .otherwise() + .to("log:other") + .to("mock:result"); + + from("activemq:queue:camel") + .to("log:camel") + .transform(constant("Bye Camel")); + } + }; + } +} Added: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/async/AsyncConsumerInOutTwoTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/async/AsyncConsumerInOutTwoTest.java?rev=1188558&view=auto ============================================================================== --- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/async/AsyncConsumerInOutTwoTest.java (added) +++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/async/AsyncConsumerInOutTwoTest.java Tue Oct 25 07:57:46 2011 @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.jms.async; + +import javax.jms.ConnectionFactory; + +import org.apache.camel.CamelContext; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.jms.CamelJmsTestHelper; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +import static org.apache.camel.component.jms.JmsComponent.jmsComponentAutoAcknowledge; + +/** + * + */ +public class AsyncConsumerInOutTwoTest extends CamelTestSupport { + + @Test + public void testAsyncJmsConsumer() throws Exception { + String out = template.requestBody("activemq:queue:start", "Hello World", String.class); + assertEquals("Bye World", out); + } + + protected CamelContext createCamelContext() throws Exception { + CamelContext camelContext = super.createCamelContext(); + + camelContext.addComponent("async", new MyAsyncComponent()); + + ConnectionFactory connectionFactory = CamelJmsTestHelper.createConnectionFactory(); + camelContext.addComponent("activemq", jmsComponentAutoAcknowledge(connectionFactory)); + + return camelContext; + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + // enable async in only mode on the consumer + from("activemq:queue:start?asyncConsumer=true") + .to("async:camel?delay=2000") + .transform(constant("Bye World")); + } + }; + } +} Added: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/async/AsyncConsumerTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/async/AsyncConsumerTest.java?rev=1188558&view=auto ============================================================================== --- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/async/AsyncConsumerTest.java (added) +++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/async/AsyncConsumerTest.java Tue Oct 25 07:57:46 2011 @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.jms.async; + +import javax.jms.ConnectionFactory; + +import org.apache.camel.CamelContext; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.jms.CamelJmsTestHelper; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +import static org.apache.camel.component.jms.JmsComponent.jmsComponentAutoAcknowledge; + +/** + * + */ +public class AsyncConsumerTest extends CamelTestSupport { + + @Test + public void testAsyncJmsConsumer() throws Exception { + // Hello World is received first despite its send last + // the reason is that the first message is processed asynchronously + // and it takes 2 sec to complete, so in between we have time to + // process the 2nd message on the queue + getMockEndpoint("mock:result").expectedBodiesReceived("Hello World", "Camel"); + + template.sendBody("activemq:queue:start", "Hello Camel"); + template.sendBody("activemq:queue:start", "Hello World"); + + assertMockEndpointsSatisfied(); + } + + protected CamelContext createCamelContext() throws Exception { + CamelContext camelContext = super.createCamelContext(); + + camelContext.addComponent("async", new MyAsyncComponent()); + + ConnectionFactory connectionFactory = CamelJmsTestHelper.createConnectionFactory(); + camelContext.addComponent("activemq", jmsComponentAutoAcknowledge(connectionFactory)); + + return camelContext; + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + // enable async in only mode on the consumer + from("activemq:queue:start?asyncConsumer=true") + .choice() + .when(body().contains("Camel")) + .to("async:camel?delay=2000") + .to("mock:result") + .otherwise() + .to("mock:result"); + } + }; + } +} Propchange: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/async/AsyncConsumerTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/async/AsyncConsumerTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Modified: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsJMSReplyToConsumerEndpointUsingInOutTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsJMSReplyToConsumerEndpointUsingInOutTest.java?rev=1188558&r1=1188557&r2=1188558&view=diff ============================================================================== --- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsJMSReplyToConsumerEndpointUsingInOutTest.java (original) +++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsJMSReplyToConsumerEndpointUsingInOutTest.java Tue Oct 25 07:57:46 2011 @@ -19,8 +19,6 @@ package org.apache.camel.component.jms.i import javax.jms.ConnectionFactory; import org.apache.camel.CamelContext; -import org.apache.camel.Exchange; -import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.jms.CamelJmsTestHelper; import org.apache.camel.test.junit4.CamelTestSupport; @@ -47,11 +45,8 @@ public class JmsJMSReplyToConsumerEndpoi return new RouteBuilder() { public void configure() throws Exception { from("activemq:queue:hello?replyTo=queue:namedReplyQueue") - .process(new Processor() { - public void process(Exchange exchange) throws Exception { - exchange.getOut().setBody("My name is Camel"); - } - }); + .to("log:hello") + .transform(constant("My name is Camel")); } }; } Modified: camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/jms/JmsJettyAsyncTest.java URL: http://svn.apache.org/viewvc/camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/jms/JmsJettyAsyncTest.java?rev=1188558&r1=1188557&r2=1188558&view=diff ============================================================================== --- camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/jms/JmsJettyAsyncTest.java (original) +++ camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/jms/JmsJettyAsyncTest.java Tue Oct 25 07:57:46 2011 @@ -31,8 +31,7 @@ import org.junit.Test; */ public class JmsJettyAsyncTest extends CamelTestSupport { - // TODO: When async jms consumer is implemented we can bump this value to 1000 - private int size = 10; + private int size = 100; private int port; @Test @@ -54,7 +53,8 @@ public class JmsJettyAsyncTest extends C return new RouteBuilder() { @Override public void configure() throws Exception { - from("activemq:queue:inbox?synchronous=false") + // enable async consumer to process messages faster + from("activemq:queue:inbox?asyncConsumer=false") .to("jetty:http://0.0.0.0:" + port + "/myapp") .to("log:result?groupSize=10", "mock:result");