Author: davsclaus
Date: Wed Jun 16 16:40:37 2010
New Revision: 955294

URL: http://svn.apache.org/viewvc?rev=955294&view=rev
Log:
CAMEL-2723: Interceptors is now support async routing.

Added:
    
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/VerySimpleDirectTest.java
      - copied, changed from r955211, 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SimpleDirectTest.java
    
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointWithDelayerTest.java
   (with props)
    
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointWithHandleFaultTest.java
   (with props)
    
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointWithJMXTest.java
   (with props)
    
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointWithStreamCachingTest.java
      - copied, changed from r955211, 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointTest.java
    
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointWithTracingTest.java
   (with props)
Modified:
    
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/AsyncProcessorTypeConverter.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/Delayer.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/HandleFaultInterceptor.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/TraceInterceptor.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/InterceptStrategy.java

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java?rev=955294&r1=955293&r2=955294&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java 
(original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java 
Wed Jun 16 16:40:37 2010
@@ -29,10 +29,8 @@ import org.apache.camel.FailedToCreatePr
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.ProducerCallback;
-import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.ServicePoolAware;
 import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
-import org.apache.camel.processor.UnitOfWorkProcessor;
 import org.apache.camel.processor.UnitOfWorkProducer;
 import org.apache.camel.spi.ServicePool;
 import org.apache.camel.util.CamelContextHelper;

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/AsyncProcessorTypeConverter.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/AsyncProcessorTypeConverter.java?rev=955294&r1=955293&r2=955294&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/AsyncProcessorTypeConverter.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/AsyncProcessorTypeConverter.java
 Wed Jun 16 16:40:37 2010
@@ -41,6 +41,11 @@ public class AsyncProcessorTypeConverter
         }
 
         public boolean process(Exchange exchange, AsyncCallback callback) {
+            if (processor == null) {
+                // no processor then we are done
+                callback.done(true);
+                return true;
+            }
             try {
                 processor.process(exchange);
             } catch (Throwable e) {

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java?rev=955294&r1=955293&r2=955294&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java
 Wed Jun 16 16:40:37 2010
@@ -16,9 +16,10 @@
  */
 package org.apache.camel.management;
 
+import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.management.mbean.ManagedPerformanceCounter;
-import org.apache.camel.processor.DelegateProcessor;
+import org.apache.camel.processor.DelegateAsyncProcessor;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -28,7 +29,7 @@ import org.apache.commons.logging.LogFac
  *
  * @version $Revision$
  */
-public class InstrumentationProcessor extends DelegateProcessor {
+public class InstrumentationProcessor extends DelegateAsyncProcessor {
 
     private static final transient Log LOG = 
LogFactory.getLog(InstrumentationProcessor.class);
     private PerformanceCounter counter;
@@ -54,26 +55,30 @@ public class InstrumentationProcessor ex
         }
     }
 
-    public void process(Exchange exchange) throws Exception {
-        if (processor != null) {
-
-            // use nano time as its more accurate
-            long startTime = -1;
-            if (counter != null && counter.isStatisticsEnabled()) {
-                startTime = System.nanoTime();
-            }
-
-            try {
-                processor.process(exchange);
-            } catch (Exception e) {
-                exchange.setException(e);
-            }
+    @Override
+    public boolean process(final Exchange exchange, final AsyncCallback 
callback) {
+        // use nano time as its more accurate
+        // and only record time if stats is enabled
+        long start = -1;
+        if (counter != null && counter.isStatisticsEnabled()) {
+            start = System.nanoTime();
+        }
+        final long startTime = start;
 
-            if (startTime != -1) {
-                long diff = (System.nanoTime() - startTime) / 1000000;
-                recordTime(exchange, diff);
+        return super.process(exchange, new AsyncCallback() {
+            public void done(boolean doneSync) {
+                try {
+                    // record end time
+                    if (startTime > -1) {
+                        long diff = (System.nanoTime() - startTime) / 1000000;
+                        recordTime(exchange, diff);
+                    }
+                } finally {
+                    // and let the original callback know we are done as well
+                    callback.done(doneSync);
+                }
             }
-        }
+        });
     }
 
     protected void recordTime(Exchange exchange, long duration) {

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java?rev=955294&r1=955293&r2=955294&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
 Wed Jun 16 16:40:37 2010
@@ -20,6 +20,7 @@ import java.util.concurrent.CountDownLat
 import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.AlreadyStoppedException;
+import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.commons.logging.Log;
@@ -33,7 +34,7 @@ import org.apache.commons.logging.LogFac
  * 
  * @version $Revision$
  */
-public abstract class DelayProcessorSupport extends DelegateProcessor {
+public abstract class DelayProcessorSupport extends DelegateAsyncProcessor {
     protected final transient Log log = LogFactory.getLog(getClass());
     private final CountDownLatch stoppedLatch = new CountDownLatch(1);
     private boolean fastStop = true;
@@ -42,9 +43,14 @@ public abstract class DelayProcessorSupp
         super(processor);
     }
 
-    public void process(Exchange exchange) throws Exception {
-        delay(exchange);
-        super.process(exchange);
+    @Override
+    public boolean process(Exchange exchange, AsyncCallback callback) {
+        try {
+            delay(exchange);
+        } catch (Exception e) {
+            exchange.setException(e);
+        }
+        return super.process(exchange, callback);
     }
 
     public boolean isFastStop() {

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java?rev=955294&r1=955293&r2=955294&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java
 Wed Jun 16 16:40:37 2010
@@ -25,6 +25,7 @@ import org.apache.camel.Exchange;
 import org.apache.camel.Navigate;
 import org.apache.camel.Processor;
 import org.apache.camel.impl.ServiceSupport;
+import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
 import org.apache.camel.util.AsyncProcessorHelper;
 import org.apache.camel.util.ServiceHelper;
 
@@ -48,6 +49,10 @@ public class DelegateAsyncProcessor exte
         this.processor = processor;
     }
 
+    public DelegateAsyncProcessor(Processor processor) {
+        this(AsyncProcessorTypeConverter.convert(processor));
+    }
+
     @Override
     public String toString() {
         return "DelegateAsync[" + processor + "]";
@@ -57,6 +62,14 @@ public class DelegateAsyncProcessor exte
         return processor;
     }
 
+    public void setProcessor(AsyncProcessor processor) {
+        this.processor = processor;
+    }
+
+    public void setProcessor(Processor processor) {
+        this.processor = AsyncProcessorTypeConverter.convert(processor);
+    }
+
     protected void doStart() throws Exception {
         ServiceHelper.startServices(processor);
     }
@@ -66,6 +79,11 @@ public class DelegateAsyncProcessor exte
     }
 
     public boolean process(final Exchange exchange, final AsyncCallback 
callback) {
+        if (processor == null) {
+            // no processor then we are done
+            callback.done(true);
+            return true;
+        }
         return processor.process(exchange, callback);
     }
 

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java?rev=955294&r1=955293&r2=955294&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
 Wed Jun 16 16:40:37 2010
@@ -131,6 +131,9 @@ public abstract class RedeliveryErrorHan
                     // okay we want to continue then prepare the exchange for 
that as well
                     prepareExchangeForContinue(exchange, data);
                 }
+
+                // we are breaking out so invoke the callback
+                callback.done(data.sync);
                 // and then return
                 return data.sync;
             }

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java?rev=955294&r1=955293&r2=955294&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java
 Wed Jun 16 16:40:37 2010
@@ -21,7 +21,6 @@ import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.impl.DefaultUnitOfWork;
-import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
 import org.apache.camel.spi.RouteContext;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -38,7 +37,7 @@ public final class UnitOfWorkProcessor e
     private final RouteContext routeContext;
 
     public UnitOfWorkProcessor(Processor processor) {
-        this(null, AsyncProcessorTypeConverter.convert(processor));
+        this(null, processor);
     }
 
     public UnitOfWorkProcessor(AsyncProcessor processor) {
@@ -46,7 +45,8 @@ public final class UnitOfWorkProcessor e
     }
 
     public UnitOfWorkProcessor(RouteContext routeContext, Processor processor) 
{
-        this(routeContext, AsyncProcessorTypeConverter.convert(processor));
+        super(processor);
+        this.routeContext = routeContext;
     }
 
     public UnitOfWorkProcessor(RouteContext routeContext, AsyncProcessor 
processor) {
@@ -81,8 +81,11 @@ public final class UnitOfWorkProcessor e
                 public void done(boolean doneSync) {
                     // Order here matters. We need to complete the callbacks
                     // since they will likely update the exchange with some 
final results.
-                    callback.done(doneSync);
-                    doneUow(uow, exchange);
+                    try {
+                        callback.done(doneSync);
+                    } finally {
+                        doneUow(uow, exchange);
+                    }
                 }
             });
         } else {

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/Delayer.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/Delayer.java?rev=955294&r1=955293&r2=955294&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/Delayer.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/Delayer.java
 Wed Jun 16 16:40:37 2010
@@ -51,9 +51,8 @@ public class Delayer implements Intercep
         return null;
     }
 
-    public Processor wrapProcessorInInterceptors(CamelContext context, 
-            ProcessorDefinition<?> definition, Processor target, Processor 
nextTarget) throws Exception {
-        
+    public Processor wrapProcessorInInterceptors(CamelContext context, 
ProcessorDefinition<?> definition,
+                                                 Processor target, Processor 
nextTarget) throws Exception {
         return new DelayInterceptor(definition, target, this);
     }
 

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/HandleFaultInterceptor.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/HandleFaultInterceptor.java?rev=955294&r1=955293&r2=955294&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/HandleFaultInterceptor.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/HandleFaultInterceptor.java
 Wed Jun 16 16:40:37 2010
@@ -16,20 +16,20 @@
  */
 package org.apache.camel.processor.interceptor;
 
+import org.apache.camel.AsyncCallback;
 import org.apache.camel.CamelException;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
-import org.apache.camel.processor.DelegateProcessor;
+import org.apache.camel.processor.DelegateAsyncProcessor;
 
-public class HandleFaultInterceptor extends DelegateProcessor {
+public class HandleFaultInterceptor extends DelegateAsyncProcessor {
 
     public HandleFaultInterceptor() {
         super();
     }
 
     public HandleFaultInterceptor(Processor processor) {
-        this();
-        setProcessor(processor);
+        super(processor);
     }
 
     @Override
@@ -38,18 +38,18 @@ public class HandleFaultInterceptor exte
     }
 
     @Override
-    public void process(Exchange exchange) throws Exception {
-        if (processor == null) {
-            return;
-        }
-
-        try {
-            processor.process(exchange);
-        } catch (Exception e) {
-            exchange.setException(e);
-        }
-
-        handleFault(exchange);
+    public boolean process(final Exchange exchange, final AsyncCallback 
callback) {
+        return getProcessor().process(exchange, new AsyncCallback() {
+            public void done(boolean doneSync) {
+                try {
+                    // handle fault after we are done
+                    handleFault(exchange);
+                } finally {
+                    // and let the original callback know we are done as well
+                    callback.done(doneSync);
+                }
+            }
+        });
     }
 
     /**
@@ -62,8 +62,8 @@ public class HandleFaultInterceptor exte
             if (faultBody != null && exchange.getException() == null) {
                 // remove fault as we are converting it to an exception
                 exchange.setOut(null);
-                if (faultBody instanceof Exception) {
-                    exchange.setException((Exception) faultBody);
+                if (faultBody instanceof Throwable) {
+                    exchange.setException((Throwable) faultBody);
                 } else {
                     // wrap it in an exception
                     exchange.setException(new 
CamelException(faultBody.toString()));

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java?rev=955294&r1=955293&r2=955294&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java
 Wed Jun 16 16:40:37 2010
@@ -16,24 +16,25 @@
  */
 package org.apache.camel.processor.interceptor;
 
+import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.StreamCache;
-import org.apache.camel.processor.DelegateProcessor;
+import org.apache.camel.processor.DelegateAsyncProcessor;
 import org.apache.camel.util.MessageHelper;
 
 /**
- * {...@link DelegateProcessor} that converts a message into a re-readable 
format
+ * An interceptor that converts streams messages into a re-readable format
+ * by wrapping the stream into a {...@link StreamCache}.
  */
-public class StreamCachingInterceptor extends DelegateProcessor {
+public class StreamCachingInterceptor extends DelegateAsyncProcessor {
 
     public StreamCachingInterceptor() {
         super();
     }
 
     public StreamCachingInterceptor(Processor processor) {
-        this();
-        setProcessor(processor);
+        super(processor);
     }
 
     @Override
@@ -42,14 +43,14 @@ public class StreamCachingInterceptor ex
     }
 
     @Override
-    public void process(Exchange exchange) throws Exception {
+    public boolean process(Exchange exchange, AsyncCallback callback) {
         StreamCache newBody = exchange.getIn().getBody(StreamCache.class);
         if (newBody != null) {
             exchange.getIn().setBody(newBody);
         }
         MessageHelper.resetStreamCache(exchange.getIn());
 
-        getProcessor().process(exchange);
+        return getProcessor().process(exchange, callback);
     }
 
 }

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/TraceInterceptor.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/TraceInterceptor.java?rev=955294&r1=955293&r2=955294&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/TraceInterceptor.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/TraceInterceptor.java
 Wed Jun 16 16:40:37 2010
@@ -32,7 +32,6 @@ import org.apache.camel.impl.DoCatchRout
 import org.apache.camel.impl.DoFinallyRouteNode;
 import org.apache.camel.impl.OnCompletionRouteNode;
 import org.apache.camel.impl.OnExceptionRouteNode;
-import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
 import org.apache.camel.model.AggregateDefinition;
 import org.apache.camel.model.CatchDefinition;
 import org.apache.camel.model.FinallyDefinition;
@@ -47,7 +46,6 @@ import org.apache.camel.spi.ExchangeForm
 import org.apache.camel.spi.InterceptStrategy;
 import org.apache.camel.spi.RouteContext;
 import org.apache.camel.spi.TracedRouteNodes;
-import org.apache.camel.util.AsyncProcessorHelper;
 import org.apache.camel.util.IntrospectionSupport;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.ServiceHelper;
@@ -73,7 +71,7 @@ public class TraceInterceptor extends De
     private String jpaTraceEventMessageClassName;
 
     public TraceInterceptor(ProcessorDefinition node, Processor target, 
TraceFormatter formatter, Tracer tracer) {
-        super(AsyncProcessorTypeConverter.convert(target));
+        super(target);
         this.tracer = tracer;
         this.node = node;
         this.formatter = formatter;
@@ -94,10 +92,6 @@ public class TraceInterceptor extends De
         this.routeContext = routeContext;
     }
 
-    public void process(final Exchange exchange) throws Exception {
-        AsyncProcessorHelper.process(this, exchange);
-    }
-
     @Override
     public boolean process(Exchange exchange, AsyncCallback callback) {
         // do not trace if tracing is disabled
@@ -178,7 +172,7 @@ public class TraceInterceptor extends De
                 // process the exchange
                 try {
                     sync = super.process(exchange, callback);
-                } catch (Exception e) {
+                } catch (Throwable e) {
                     exchange.setException(e);
                 }
             } finally {
@@ -188,7 +182,7 @@ public class TraceInterceptor extends De
                     traceExchangeOut(exchange, traceState);
                 }
             }
-        } catch (Exception e) {
+        } catch (Throwable e) {
             // some exception occurred in trace logic
             if (shouldLogException(exchange)) {
                 logException(exchange, e);
@@ -196,7 +190,6 @@ public class TraceInterceptor extends De
             exchange.setException(e);
         }
 
-        callback.done(sync);
         return sync;
     }
 

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/InterceptStrategy.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/InterceptStrategy.java?rev=955294&r1=955293&r2=955294&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/InterceptStrategy.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/InterceptStrategy.java
 Wed Jun 16 16:40:37 2010
@@ -29,19 +29,24 @@ import org.apache.camel.model.ProcessorD
  */
 public interface InterceptStrategy {
 
+    // TODO: We should force this strategy to return AsyncProcessor so custom 
interceptors work nicely with async
+
     /**
      * This method is invoked by
      * {...@link ProcessorDefinition#wrapProcessor(RouteContext, Processor)}
      * to give the implementor an opportunity to wrap the target processor
      * in a route.
+     * <p/>
+     * Its adviced to use an {...@link org.apache.camel.AsyncProcessor} as the 
returned wrapped
+     * {...@link Processor} which ensures the interceptor works well with the 
asynchronous routing engine.
      *
      * @param context       Camel context
      * @param definition    the model this interceptor represents
      * @param target        the processor to be wrapped
      * @param nextTarget    the next processor to be routed to
-     * @return processor wrapped with an interceptor or not wrapped
+     * @return processor    wrapped with an interceptor or not wrapped.
      * @throws Exception can be thrown
      */
-    Processor wrapProcessorInInterceptors(CamelContext context, 
-            ProcessorDefinition<?> definition, Processor target, Processor 
nextTarget) throws Exception;
+    Processor wrapProcessorInInterceptors(CamelContext context, 
ProcessorDefinition<?> definition, 
+                                          Processor target, Processor 
nextTarget) throws Exception;
 }

Copied: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/VerySimpleDirectTest.java
 (from r955211, 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SimpleDirectTest.java)
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/VerySimpleDirectTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/VerySimpleDirectTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SimpleDirectTest.java&r1=955211&r2=955294&rev=955294&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SimpleDirectTest.java
 (original)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/VerySimpleDirectTest.java
 Wed Jun 16 16:40:37 2010
@@ -22,11 +22,9 @@ import org.apache.camel.builder.RouteBui
 /**
  * @version $Revision$
  */
-public class SimpleDirectTest extends ContextTestSupport {
+public class VerySimpleDirectTest extends ContextTestSupport {
 
-    public void testSimpleDirect() throws Exception {
-        getMockEndpoint("mock:foo").expectedMessageCount(1);
-        getMockEndpoint("mock:bar").expectedMessageCount(1);
+    public void testVerySimpleDirect() throws Exception {
         getMockEndpoint("mock:result").expectedMessageCount(1);
 
         template.sendBody("direct:start", "Hello World");
@@ -39,11 +37,8 @@ public class SimpleDirectTest extends Co
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                
from("direct:start").to("direct:foo").to("direct:bar").to("mock:result");
-
-                from("direct:foo").to("mock:foo");
-                from("direct:bar").to("mock:bar");
+                from("direct:start").to("mock:result");
             }
         };
     }
-}
+}
\ No newline at end of file

Added: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointWithDelayerTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointWithDelayerTest.java?rev=955294&view=auto
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointWithDelayerTest.java
 (added)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointWithDelayerTest.java
 Wed Jun 16 16:40:37 2010
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.async;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * @version $Revision$
+ */
+public class AsyncEndpointWithDelayerTest extends ContextTestSupport {
+
+    private static String beforeThreadName;
+    private static String afterThreadName;
+
+    public void testAsyncEndpoint() throws Exception {
+        getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel");
+        getMockEndpoint("mock:after").expectedBodiesReceived("Bye Camel");
+        getMockEndpoint("mock:result").expectedBodiesReceived("Bye Camel");
+
+        String reply = template.requestBody("direct:start", "Hello Camel", 
String.class);
+        assertEquals("Bye Camel", reply);
+
+        assertMockEndpointsSatisfied();
+
+        assertFalse("Should use different threads", 
beforeThreadName.equalsIgnoreCase(afterThreadName));
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                context.addComponent("async", new MyAsyncComponent());
+
+                // enable delayer to ensure it works using async API
+                from("direct:start").delayer(100)
+                        .to("mock:before")
+                        .to("log:before")
+                        .process(new Processor() {
+                            public void process(Exchange exchange) throws 
Exception {
+                                beforeThreadName = 
Thread.currentThread().getName();
+                            }
+                        })
+                        .to("async:foo")
+                        .process(new Processor() {
+                            public void process(Exchange exchange) throws 
Exception {
+                                afterThreadName = 
Thread.currentThread().getName();
+                            }
+                        })
+                        .to("log:after")
+                        .to("mock:after")
+                        .to("mock:result");
+            }
+        };
+    }
+
+}
\ No newline at end of file

Propchange: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointWithDelayerTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointWithDelayerTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointWithHandleFaultTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointWithHandleFaultTest.java?rev=955294&view=auto
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointWithHandleFaultTest.java
 (added)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointWithHandleFaultTest.java
 Wed Jun 16 16:40:37 2010
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.async;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * @version $Revision$
+ */
+public class AsyncEndpointWithHandleFaultTest extends ContextTestSupport {
+
+    private static String beforeThreadName;
+    private static String afterThreadName;
+
+    public void testAsyncEndpoint() throws Exception {
+        getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel");
+        getMockEndpoint("mock:after").expectedMessageCount(0);
+        getMockEndpoint("mock:result").expectedMessageCount(0);
+
+        Exchange reply = template.request("direct:start", new Processor() {
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setBody("Hello Camel");
+            }
+        });
+        assertNotNull(reply);
+        assertTrue(reply.isFailed());
+        assertNotNull(reply.getException());
+        assertEquals("Faulty Bye Camel", reply.getException().getMessage());
+
+        assertMockEndpointsSatisfied();
+
+        assertFalse("Should use different threads", 
beforeThreadName.equalsIgnoreCase(afterThreadName));
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                context.addComponent("async", new MyAsyncComponent());
+
+                // enable handle fault to ensure it works using async API
+                from("direct:start").handleFault()
+                        .to("mock:before")
+                        .to("log:before")
+                        .process(new Processor() {
+                            public void process(Exchange exchange) throws 
Exception {
+                                beforeThreadName = 
Thread.currentThread().getName();
+                            }
+                        })
+                        .to("async:foo")
+                        .to("log:after")
+                        .process(new Processor() {
+                            public void process(Exchange exchange) throws 
Exception {
+                                afterThreadName = 
Thread.currentThread().getName();
+                                exchange.getOut().setFault(true);
+                                exchange.getOut().setBody("Faulty Bye Camel");
+                            }
+                        })
+                        .to("mock:after")
+                        .to("mock:result");
+            }
+        };
+    }
+
+}
\ No newline at end of file

Propchange: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointWithHandleFaultTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointWithHandleFaultTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointWithJMXTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointWithJMXTest.java?rev=955294&view=auto
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointWithJMXTest.java
 (added)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointWithJMXTest.java
 Wed Jun 16 16:40:37 2010
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.async;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * @version $Revision$
+ */
+public class AsyncEndpointWithJMXTest extends ContextTestSupport {
+
+    private static String beforeThreadName;
+    private static String afterThreadName;
+
+    @Override
+    protected boolean useJmx() {
+        return true;
+    }
+
+    public void testAsyncEndpoint() throws Exception {
+        getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel");
+        getMockEndpoint("mock:after").expectedBodiesReceived("Bye Camel");
+        getMockEndpoint("mock:result").expectedBodiesReceived("Bye Camel");
+
+        String reply = template.requestBody("direct:start", "Hello Camel", 
String.class);
+        assertEquals("Bye Camel", reply);
+
+        assertMockEndpointsSatisfied();
+
+        assertFalse("Should use different threads", 
beforeThreadName.equalsIgnoreCase(afterThreadName));
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                context.addComponent("async", new MyAsyncComponent());
+
+                from("direct:start")
+                        .to("mock:before")
+                        .to("log:before")
+                        .process(new Processor() {
+                            public void process(Exchange exchange) throws 
Exception {
+                                beforeThreadName = 
Thread.currentThread().getName();
+                            }
+                        })
+                        .to("async:foo")
+                        .process(new Processor() {
+                            public void process(Exchange exchange) throws 
Exception {
+                                afterThreadName = 
Thread.currentThread().getName();
+                            }
+                        })
+                        .to("log:after")
+                        .to("mock:after")
+                        .to("mock:result");
+            }
+        };
+    }
+
+}
\ No newline at end of file

Propchange: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointWithJMXTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointWithJMXTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Copied: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointWithStreamCachingTest.java
 (from r955211, 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointTest.java)
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointWithStreamCachingTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointWithStreamCachingTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointTest.java&r1=955211&r2=955294&rev=955294&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointTest.java
 (original)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointWithStreamCachingTest.java
 Wed Jun 16 16:40:37 2010
@@ -24,7 +24,7 @@ import org.apache.camel.builder.RouteBui
 /**
  * @version $Revision$
  */
-public class AsyncEndpointTest extends ContextTestSupport {
+public class AsyncEndpointWithStreamCachingTest extends ContextTestSupport {
 
     private static String beforeThreadName;
     private static String afterThreadName;
@@ -49,7 +49,8 @@ public class AsyncEndpointTest extends C
             public void configure() throws Exception {
                 context.addComponent("async", new MyAsyncComponent());
 
-                from("direct:start")
+                // enable stream caching to ensure it works using async API
+                from("direct:start").streamCaching().tracing()
                         .to("mock:before")
                         .to("log:before")
                         .process(new Processor() {
@@ -70,4 +71,4 @@ public class AsyncEndpointTest extends C
         };
     }
 
-}
+}
\ No newline at end of file

Added: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointWithTracingTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointWithTracingTest.java?rev=955294&view=auto
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointWithTracingTest.java
 (added)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointWithTracingTest.java
 Wed Jun 16 16:40:37 2010
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.async;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * @version $Revision$
+ */
+public class AsyncEndpointWithTracingTest extends ContextTestSupport {
+
+    private static String beforeThreadName;
+    private static String afterThreadName;
+
+    public void testAsyncEndpoint() throws Exception {
+        getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel");
+        getMockEndpoint("mock:after").expectedBodiesReceived("Bye Camel");
+        getMockEndpoint("mock:result").expectedBodiesReceived("Bye Camel");
+
+        String reply = template.requestBody("direct:start", "Hello Camel", 
String.class);
+        assertEquals("Bye Camel", reply);
+
+        assertMockEndpointsSatisfied();
+
+        assertFalse("Should use different threads", 
beforeThreadName.equalsIgnoreCase(afterThreadName));
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                context.addComponent("async", new MyAsyncComponent());
+
+                // enable tracing to ensure it works using async API
+                from("direct:start").tracing()
+                        .to("mock:before")
+                        .to("log:before")
+                        .process(new Processor() {
+                            public void process(Exchange exchange) throws 
Exception {
+                                beforeThreadName = 
Thread.currentThread().getName();
+                            }
+                        })
+                        .to("async:foo")
+                        .process(new Processor() {
+                            public void process(Exchange exchange) throws 
Exception {
+                                afterThreadName = 
Thread.currentThread().getName();
+                            }
+                        })
+                        .to("log:after")
+                        .to("mock:after")
+                        .to("mock:result");
+            }
+        };
+    }
+
+}
\ No newline at end of file

Propchange: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointWithTracingTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointWithTracingTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date


Reply via email to