Author: davsclaus
Date: Tue Oct 25 07:57:46 2011
New Revision: 1188558

URL: http://svn.apache.org/viewvc?rev=1188558&view=rev
Log:
CAMEL-3632: Added asyncConsumer option to camel-jms. This allows the 
JmsConsumer to process the exchange async to better scale.

Added:
    
camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/async/AsyncConsumerFalseTest.java
    
camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/async/AsyncConsumerInOutTest.java
    
camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/async/AsyncConsumerInOutTwoTest.java
    
camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/async/AsyncConsumerTest.java
   (with props)
Modified:
    
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
    
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
    
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
    
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
    
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
    
camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsJMSReplyToConsumerEndpointUsingInOutTest.java
    
camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/jms/JmsJettyAsyncTest.java

Modified: 
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java?rev=1188558&r1=1188557&r2=1188558&view=diff
==============================================================================
--- 
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
 (original)
+++ 
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
 Tue Oct 25 07:57:46 2011
@@ -22,6 +22,8 @@ import javax.jms.Message;
 import javax.jms.MessageListener;
 import javax.jms.Session;
 
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.Processor;
@@ -29,6 +31,8 @@ import org.apache.camel.RollbackExchange
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.impl.LoggingExceptionHandler;
 import org.apache.camel.spi.ExceptionHandler;
+import org.apache.camel.util.AsyncProcessorConverterHelper;
+import org.apache.camel.util.AsyncProcessorHelper;
 import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -48,18 +52,18 @@ import static org.apache.camel.util.Obje
 public class EndpointMessageListener implements MessageListener {
     private static final transient Logger LOG = 
LoggerFactory.getLogger(EndpointMessageListener.class);
     private ExceptionHandler exceptionHandler;
-    private JmsEndpoint endpoint;
-    private Processor processor;
+    private final JmsEndpoint endpoint;
+    private final AsyncProcessor processor;
     private JmsBinding binding;
     private boolean eagerLoadingOfProperties;
     private Object replyToDestination;
     private JmsOperations template;
     private boolean disableReplyTo;
+    private boolean async;
 
     public EndpointMessageListener(JmsEndpoint endpoint, Processor processor) {
         this.endpoint = endpoint;
-        this.processor = processor;
-        endpoint.getConfiguration().configure(this);
+        this.processor = AsyncProcessorConverterHelper.convert(processor);
     }
 
     public void onMessage(final Message message) {
@@ -68,7 +72,7 @@ public class EndpointMessageListener imp
         LOG.debug("{} consumer received JMS message: {}", endpoint, message);
 
         boolean sendReply;
-        RuntimeCamelException rce = null;
+        RuntimeCamelException rce;
         try {
             Object replyDestination = getReplyToDestination(message);
             // we can only send back a reply if there was a reply destination 
configured
@@ -84,13 +88,74 @@ public class EndpointMessageListener imp
                 LOG.debug("Received Message has JMSCorrelationID [" + 
correlationId + "]");
             }
 
-            // process the exchange
+            // process the exchange either asynchronously or synchronous
             LOG.trace("onMessage.process START");
-            try {
-                processor.process(exchange);
-            } catch (Throwable e) {
-                exchange.setException(e);
+            AsyncCallback callback = new 
EndpointMessageListenerAsyncCallback(message, exchange, endpoint, sendReply, 
replyDestination);
+
+            // async is by default false, which mean we by default will 
process the exchange synchronously
+            // to keep backwards compatible, as well ensure this consumer will 
pickup messages in order
+            // (eg to not consume the next message before the previous has 
been fully processed)
+            // but if end user explicit configure consumerAsync=true, then we 
can process the message
+            // asynchronously (unless endpoint has been configured 
synchronous, or we use transaction)
+            boolean forceSync = endpoint.isSynchronous() || 
endpoint.isTransacted();
+            if (forceSync || !isAsync()) {
+                // must process synchronous if transacted or configured to do 
so
+                LOG.trace("Processing exchange {} synchronously", 
exchange.getExchangeId());
+                try {
+                    processor.process(exchange);
+                } catch (Exception e) {
+                    exchange.setException(e);
+                } finally {
+                    callback.done(true);
+                }
+            } else {
+                // process asynchronous using the async routing engine
+                LOG.trace("Processing exchange {} asynchronously", 
exchange.getExchangeId());
+                boolean sync = AsyncProcessorHelper.process(processor, 
exchange, callback);
+                if (!sync) {
+                    // will be done async so return now
+                    return;
+                }
             }
+            // if we failed processed the exchange from the async callback 
task, then grab the exception
+            rce = exchange.getException(RuntimeCamelException.class);
+
+        } catch (Exception e) {
+            rce = wrapRuntimeCamelException(e);
+        }
+
+        // an exception occurred so rethrow to trigger rollback on JMS listener
+        if (rce != null) {
+            handleException(rce);
+            LOG.trace("onMessage END throwing exception: {}", 
rce.getMessage());
+            throw rce;
+        }
+
+        LOG.trace("onMessage END");
+    }
+
+    /**
+     * Callback task that is performed when the exchange has been processed
+     */
+    private final class EndpointMessageListenerAsyncCallback implements 
AsyncCallback {
+
+        private final Message message;
+        private final Exchange exchange;
+        private final JmsEndpoint endpoint;
+        private final boolean sendReply;
+        private final Object replyDestination;
+
+        private EndpointMessageListenerAsyncCallback(Message message, Exchange 
exchange, JmsEndpoint endpoint,
+                                                     boolean sendReply, Object 
replyDestination) {
+            this.message = message;
+            this.exchange = exchange;
+            this.endpoint = endpoint;
+            this.sendReply = sendReply;
+            this.replyDestination = replyDestination;
+        }
+
+        @Override
+        public void done(boolean doneSync) {
             LOG.trace("onMessage.process END");
 
             // now we evaluate the processing of the exchange and determine if 
it was a success or failure
@@ -100,6 +165,7 @@ public class EndpointMessageListener imp
             // if we send back a reply it can either be the message body or 
transferring a caused exception
             org.apache.camel.Message body = null;
             Exception cause = null;
+            RuntimeCamelException rce = null;
 
             if (exchange.isFailed() || exchange.isRollbackOnly()) {
                 if (exchange.isRollbackOnly()) {
@@ -140,18 +206,18 @@ public class EndpointMessageListener imp
                 LOG.trace("onMessage.sendReply END");
             }
 
-        } catch (Exception e) {
-            rce = wrapRuntimeCamelException(e);
-        }
-
-        // an exception occurred so rethrow to trigger rollback on JMS listener
-        if (rce != null) {
-            handleException(rce);
-            LOG.trace("onMessage END throwing exception: {}", 
rce.getMessage());
-            throw rce;
+            // if an exception occurred
+            if (rce != null) {
+                if (doneSync) {
+                    // we were done sync, so put exception on exchange, so we 
can grab it in the onMessage
+                    // method and rethrow it
+                    exchange.setException(rce);
+                } else {
+                    // we were done async, so use the Camel built in exception 
handler to deal with it
+                    handleException(rce);
+                }
+            }
         }
-
-        LOG.trace("onMessage END");
     }
 
     public Exchange createExchange(Message message, Object replyDestination) {
@@ -245,6 +311,20 @@ public class EndpointMessageListener imp
         this.replyToDestination = replyToDestination;
     }
 
+    public boolean isAsync() {
+        return async;
+    }
+
+    /**
+     * Sets whether asynchronous routing is enabled.
+     * <p/>
+     * By default this is <tt>false</tt>. If configured as <tt>true</tt> then
+     * this listener will process the {@link org.apache.camel.Exchange} 
asynchronous.
+     */
+    public void setAsync(boolean async) {
+        this.async = async;
+    }
+
     // Implementation methods
     //-------------------------------------------------------------------------
 
@@ -327,4 +407,8 @@ public class EndpointMessageListener imp
         getExceptionHandler().handleException(t);
     }
 
+    @Override
+    public String toString() {
+        return "EndpointMessageListener[" + endpoint + "]";
+    }
 }

Modified: 
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java?rev=1188558&r1=1188557&r2=1188558&view=diff
==============================================================================
--- 
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
 (original)
+++ 
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
 Tue Oct 25 07:57:46 2011
@@ -348,6 +348,14 @@ public class JmsComponent extends Defaul
         getConfiguration().setPreserveMessageQos(preserveMessageQos);
     }
 
+    public void setAsyncConsumer(boolean asyncConsumer) {
+        configuration.setAsyncConsumer(asyncConsumer);
+    }
+
+    public boolean isAsyncConsumer() {
+        return configuration.isAsyncConsumer();
+    }
+
     public void setApplicationContext(ApplicationContext applicationContext) 
throws BeansException {
         this.applicationContext = applicationContext;
     }

Modified: 
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java?rev=1188558&r1=1188557&r2=1188558&view=diff
==============================================================================
--- 
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
 (original)
+++ 
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
 Tue Oct 25 07:57:46 2011
@@ -125,6 +125,7 @@ public class JmsConfiguration implements
     // to force disabling time to live (works in both in-only or in-out mode)
     private boolean disableTimeToLive;
     private ReplyToType replyToType;
+    private boolean asyncConsumer;
 
     public JmsConfiguration() {
     }
@@ -971,7 +972,7 @@ public class JmsConfiguration implements
         }
     }
 
-    public void configure(EndpointMessageListener listener) {
+    public void configureMessageListener(EndpointMessageListener listener) {
         if (isDisableReplyTo()) {
             listener.setDisableReplyTo(true);
         }
@@ -1192,4 +1193,19 @@ public class JmsConfiguration implements
     public void setReplyToType(ReplyToType replyToType) {
         this.replyToType = replyToType;
     }
+
+    public boolean isAsyncConsumer() {
+        return asyncConsumer;
+    }
+
+    /**
+     * Sets whether asynchronous routing is enabled on {@link JmsConsumer}.
+     * <p/>
+     * By default this is <tt>false</tt>. If configured as <tt>true</tt> then
+     * the {@link JmsConsumer} will process the {@link 
org.apache.camel.Exchange} asynchronous.
+     */
+    public void setAsyncConsumer(boolean asyncConsumer) {
+        this.asyncConsumer = asyncConsumer;
+    }
+
 }

Modified: 
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java?rev=1188558&r1=1188557&r2=1188558&view=diff
==============================================================================
--- 
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
 (original)
+++ 
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
 Tue Oct 25 07:57:46 2011
@@ -64,7 +64,9 @@ public class JmsConsumer extends Default
 
     protected void createMessageListener(JmsEndpoint endpoint, Processor 
processor) {
         messageListener = new EndpointMessageListener(endpoint, processor);
+        
getEndpoint().getConfiguration().configureMessageListener(messageListener);
         messageListener.setBinding(endpoint.getBinding());
+        
messageListener.setAsync(endpoint.getConfiguration().isAsyncConsumer());
     }
 
     protected void createMessageListenerContainer() throws Exception {

Modified: 
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java?rev=1188558&r1=1188557&r2=1188558&view=diff
==============================================================================
--- 
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
 (original)
+++ 
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
 Tue Oct 25 07:57:46 2011
@@ -1014,6 +1014,16 @@ public class JmsEndpoint extends Default
     }
 
     @ManagedAttribute
+    public void setAsyncConsumer(boolean asyncConsumer) {
+        configuration.setAsyncConsumer(asyncConsumer);
+    }
+
+    @ManagedAttribute
+    public boolean isAsyncConsumer() {
+        return configuration.isAsyncConsumer();
+    }
+
+    @ManagedAttribute
     public String getReplyToType() {
         if (configuration.getReplyToType() != null) {
             return configuration.getReplyToType().name();

Added: 
camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/async/AsyncConsumerFalseTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/async/AsyncConsumerFalseTest.java?rev=1188558&view=auto
==============================================================================
--- 
camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/async/AsyncConsumerFalseTest.java
 (added)
+++ 
camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/async/AsyncConsumerFalseTest.java
 Tue Oct 25 07:57:46 2011
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms.async;
+
+import javax.jms.ConnectionFactory;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.jms.CamelJmsTestHelper;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+import static 
org.apache.camel.component.jms.JmsComponent.jmsComponentAutoAcknowledge;
+
+/**
+ *
+ */
+public class AsyncConsumerFalseTest extends CamelTestSupport {
+
+    @Test
+    public void testAsyncJmsConsumer() throws Exception {
+        // async is disabled (so we should receive in same order)
+        getMockEndpoint("mock:result").expectedBodiesReceived("Camel", "Hello 
World");
+
+        template.sendBody("activemq:queue:start", "Hello Camel");
+        template.sendBody("activemq:queue:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext camelContext = super.createCamelContext();
+
+        camelContext.addComponent("async", new MyAsyncComponent());
+
+        ConnectionFactory connectionFactory = 
CamelJmsTestHelper.createConnectionFactory();
+        camelContext.addComponent("activemq", 
jmsComponentAutoAcknowledge(connectionFactory));
+
+        return camelContext;
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                // disable async in only mode on the consumer
+                from("activemq:queue:start?asyncConsumer=false")
+                        .choice()
+                            .when(body().contains("Camel"))
+                            .to("async:camel?delay=2000")
+                            .to("mock:result")
+                        .otherwise()
+                            .to("mock:result");
+            }
+        };
+    }
+}

Added: 
camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/async/AsyncConsumerInOutTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/async/AsyncConsumerInOutTest.java?rev=1188558&view=auto
==============================================================================
--- 
camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/async/AsyncConsumerInOutTest.java
 (added)
+++ 
camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/async/AsyncConsumerInOutTest.java
 Tue Oct 25 07:57:46 2011
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms.async;
+
+import javax.jms.ConnectionFactory;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.jms.CamelJmsTestHelper;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+import static 
org.apache.camel.component.jms.JmsComponent.jmsComponentAutoAcknowledge;
+
+/**
+ *
+ */
+public class AsyncConsumerInOutTest extends CamelTestSupport {
+
+    @Test
+    public void testAsyncJmsConsumer() throws Exception {
+        // Hello World is received first despite its send last
+        // the reason is that the first message is processed asynchronously
+        // and it takes 2 sec to complete, so in between we have time to
+        // process the 2nd message on the queue
+        getMockEndpoint("mock:result").expectedBodiesReceived("Hello World", 
"Bye Camel");
+
+        template.sendBody("activemq:queue:start", "Hello Camel");
+        template.sendBody("activemq:queue:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext camelContext = super.createCamelContext();
+
+        camelContext.addComponent("async", new MyAsyncComponent());
+
+        ConnectionFactory connectionFactory = 
CamelJmsTestHelper.createConnectionFactory();
+        camelContext.addComponent("activemq", 
jmsComponentAutoAcknowledge(connectionFactory));
+
+        return camelContext;
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                // enable async in only mode on the consumer
+                from("activemq:queue:start?asyncConsumer=true")
+                        .choice()
+                            .when(body().contains("Camel"))
+                            .to("async:camel?delay=2000")
+                            .inOut("activemq:queue:camel")
+                            .to("mock:result")
+                        .otherwise()
+                            .to("log:other")
+                            .to("mock:result");
+
+                from("activemq:queue:camel")
+                    .to("log:camel")
+                    .transform(constant("Bye Camel"));
+            }
+        };
+    }
+}

Added: 
camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/async/AsyncConsumerInOutTwoTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/async/AsyncConsumerInOutTwoTest.java?rev=1188558&view=auto
==============================================================================
--- 
camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/async/AsyncConsumerInOutTwoTest.java
 (added)
+++ 
camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/async/AsyncConsumerInOutTwoTest.java
 Tue Oct 25 07:57:46 2011
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms.async;
+
+import javax.jms.ConnectionFactory;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.jms.CamelJmsTestHelper;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+import static 
org.apache.camel.component.jms.JmsComponent.jmsComponentAutoAcknowledge;
+
+/**
+ *
+ */
+public class AsyncConsumerInOutTwoTest extends CamelTestSupport {
+
+    @Test
+    public void testAsyncJmsConsumer() throws Exception {
+        String out = template.requestBody("activemq:queue:start", "Hello 
World", String.class);
+        assertEquals("Bye World", out);
+    }
+
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext camelContext = super.createCamelContext();
+
+        camelContext.addComponent("async", new MyAsyncComponent());
+
+        ConnectionFactory connectionFactory = 
CamelJmsTestHelper.createConnectionFactory();
+        camelContext.addComponent("activemq", 
jmsComponentAutoAcknowledge(connectionFactory));
+
+        return camelContext;
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                // enable async in only mode on the consumer
+                from("activemq:queue:start?asyncConsumer=true")
+                    .to("async:camel?delay=2000")
+                    .transform(constant("Bye World"));
+            }
+        };
+    }
+}

Added: 
camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/async/AsyncConsumerTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/async/AsyncConsumerTest.java?rev=1188558&view=auto
==============================================================================
--- 
camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/async/AsyncConsumerTest.java
 (added)
+++ 
camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/async/AsyncConsumerTest.java
 Tue Oct 25 07:57:46 2011
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms.async;
+
+import javax.jms.ConnectionFactory;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.jms.CamelJmsTestHelper;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+import static 
org.apache.camel.component.jms.JmsComponent.jmsComponentAutoAcknowledge;
+
+/**
+ *
+ */
+public class AsyncConsumerTest extends CamelTestSupport {
+
+    @Test
+    public void testAsyncJmsConsumer() throws Exception {
+        // Hello World is received first despite its send last
+        // the reason is that the first message is processed asynchronously
+        // and it takes 2 sec to complete, so in between we have time to
+        // process the 2nd message on the queue
+        getMockEndpoint("mock:result").expectedBodiesReceived("Hello World", 
"Camel");
+
+        template.sendBody("activemq:queue:start", "Hello Camel");
+        template.sendBody("activemq:queue:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext camelContext = super.createCamelContext();
+
+        camelContext.addComponent("async", new MyAsyncComponent());
+
+        ConnectionFactory connectionFactory = 
CamelJmsTestHelper.createConnectionFactory();
+        camelContext.addComponent("activemq", 
jmsComponentAutoAcknowledge(connectionFactory));
+
+        return camelContext;
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                // enable async in only mode on the consumer
+                from("activemq:queue:start?asyncConsumer=true")
+                        .choice()
+                            .when(body().contains("Camel"))
+                            .to("async:camel?delay=2000")
+                            .to("mock:result")
+                        .otherwise()
+                            .to("mock:result");
+            }
+        };
+    }
+}

Propchange: 
camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/async/AsyncConsumerTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/async/AsyncConsumerTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: 
camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsJMSReplyToConsumerEndpointUsingInOutTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsJMSReplyToConsumerEndpointUsingInOutTest.java?rev=1188558&r1=1188557&r2=1188558&view=diff
==============================================================================
--- 
camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsJMSReplyToConsumerEndpointUsingInOutTest.java
 (original)
+++ 
camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsJMSReplyToConsumerEndpointUsingInOutTest.java
 Tue Oct 25 07:57:46 2011
@@ -19,8 +19,6 @@ package org.apache.camel.component.jms.i
 import javax.jms.ConnectionFactory;
 
 import org.apache.camel.CamelContext;
-import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.jms.CamelJmsTestHelper;
 import org.apache.camel.test.junit4.CamelTestSupport;
@@ -47,11 +45,8 @@ public class JmsJMSReplyToConsumerEndpoi
         return new RouteBuilder() {
             public void configure() throws Exception {
                 from("activemq:queue:hello?replyTo=queue:namedReplyQueue")
-                    .process(new Processor() {
-                        public void process(Exchange exchange) throws 
Exception {
-                            exchange.getOut().setBody("My name is Camel");
-                        }
-                    });
+                    .to("log:hello")
+                    .transform(constant("My name is Camel"));
             }
         };
     }

Modified: 
camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/jms/JmsJettyAsyncTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/jms/JmsJettyAsyncTest.java?rev=1188558&r1=1188557&r2=1188558&view=diff
==============================================================================
--- 
camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/jms/JmsJettyAsyncTest.java
 (original)
+++ 
camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/jms/JmsJettyAsyncTest.java
 Tue Oct 25 07:57:46 2011
@@ -31,8 +31,7 @@ import org.junit.Test;
  */
 public class JmsJettyAsyncTest extends CamelTestSupport {
 
-    // TODO: When async jms consumer is implemented we can bump this value to 
1000
-    private int size = 10;
+    private int size = 100;
     private int port;
 
     @Test
@@ -54,7 +53,8 @@ public class JmsJettyAsyncTest extends C
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("activemq:queue:inbox?synchronous=false")
+                // enable async consumer to process messages faster
+                from("activemq:queue:inbox?asyncConsumer=false")
                     .to("jetty:http://0.0.0.0:"; + port + "/myapp")
                     .to("log:result?groupSize=10", "mock:result");
 


Reply via email to