Author: davsclaus
Date: Tue Dec 28 13:10:59 2010
New Revision: 1053342

URL: http://svn.apache.org/viewvc?rev=1053342&view=rev
Log:
CAMEL-3285: Polished code a bit due code review.

Modified:
    
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxComponent.java
    
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxConfiguration.java
    
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxServiceSupport.java
    
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectConsumer.java
    
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectEndpoint.java
    
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectProducer.java
    
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaConsumer.java
    
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaEndpoint.java
    
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/strategy/RouteboxDispatchStrategy.java
    
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/strategy/RouteboxDispatcher.java

Modified: 
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxComponent.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxComponent.java?rev=1053342&r1=1053341&r2=1053342&view=diff
==============================================================================
--- 
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxComponent.java
 (original)
+++ 
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxComponent.java
 Tue Dec 28 13:10:59 2010
@@ -30,7 +30,7 @@ import org.apache.camel.component.routeb
 import org.apache.camel.impl.DefaultComponent;
 
 public class RouteboxComponent extends DefaultComponent {
-    RouteboxConfiguration config;
+    final RouteboxConfiguration config;
     private final Map<String, BlockingQueue<Exchange>> queues = new 
HashMap<String, BlockingQueue<Exchange>>();
     
     public RouteboxComponent() {

Modified: 
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxConfiguration.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxConfiguration.java?rev=1053342&r1=1053341&r2=1053342&view=diff
==============================================================================
--- 
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxConfiguration.java
 (original)
+++ 
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxConfiguration.java
 Tue Dec 28 13:10:59 2010
@@ -27,7 +27,6 @@ import org.apache.camel.ProducerTemplate
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.routebox.strategy.RouteboxDispatchStrategy;
 import org.apache.camel.impl.DefaultCamelContext;
-import org.apache.camel.impl.DefaultProducerTemplate;
 import org.apache.camel.spi.Registry;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -57,7 +56,7 @@ public class RouteboxConfiguration {
     public RouteboxConfiguration() {
     }
 
-    public RouteboxConfiguration(URI uri) throws Exception {
+    public RouteboxConfiguration(URI uri) {
         this();
         this.uri = uri;
     }
@@ -71,7 +70,9 @@ public class RouteboxConfiguration {
         
         setUri(uri);
         setAuthority(uri.getAuthority());
-        LOG.info("Authority: " + uri.getAuthority());
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Authority: " + uri.getAuthority());
+        }
         
         setEndpointName(getAuthority());
         
@@ -113,31 +114,28 @@ public class RouteboxConfiguration {
         }
         
         if (parameters.containsKey("innerRegistry")) {
-            innerRegistry = (Registry) 
component.resolveAndRemoveReferenceParameter(parameters, "innerRegistry", 
Registry.class);
+            innerRegistry = 
component.resolveAndRemoveReferenceParameter(parameters, "innerRegistry", 
Registry.class);
         }
         
         if (isForkContext()) {
             if (innerRegistry != null) {
-                innerContext = (CamelContext) 
component.resolveAndRemoveReferenceParameter(parameters, "innerContext", 
CamelContext.class, new DefaultCamelContext(innerRegistry));
+                innerContext = 
component.resolveAndRemoveReferenceParameter(parameters, "innerContext", 
CamelContext.class, new DefaultCamelContext(innerRegistry));
             } else {
-                innerContext = (CamelContext) 
component.resolveAndRemoveReferenceParameter(parameters, "innerContext", 
CamelContext.class, new DefaultCamelContext());
+                innerContext = 
component.resolveAndRemoveReferenceParameter(parameters, "innerContext", 
CamelContext.class, new DefaultCamelContext());
             }
-
         } else {
             innerContext = component.getCamelContext();
         }
         
-        //configureInnerContext();
-        innerProducerTemplate = new DefaultProducerTemplate(innerContext);
-        innerProducerTemplate.start();
+        innerProducerTemplate = innerContext.createProducerTemplate();
         setQueueSize(component.getAndRemoveParameter(parameters, "size", 
Integer.class, 0));
         consumerUri = component.resolveAndRemoveReferenceParameter(parameters, 
"consumerUri", URI.class, new URI("routebox:" + getEndpointName()));
         producerUri = component.resolveAndRemoveReferenceParameter(parameters, 
"producerUri", URI.class, new URI("routebox:" + getEndpointName()));        
         
         dispatchStrategy = 
component.resolveAndRemoveReferenceParameter(parameters, "dispatchStrategy", 
RouteboxDispatchStrategy.class, null);
         dispatchMap = (HashMap<String, String>) 
component.resolveAndRemoveReferenceParameter(parameters, "dispatchMap", 
HashMap.class, new HashMap<String, String>());
-        if ((dispatchStrategy == null) && (dispatchMap == null)) { 
-            LOG.warn("No Routebox Dispatch Map or Strategy has been set. 
Routebox may not have more than one inner route");
+        if (dispatchStrategy == null && dispatchMap == null) {
+            LOG.warn("No Routebox Dispatch Map or Strategy has been set. 
Routebox may not have more than one inner route.");
         }        
     }
 

Modified: 
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxServiceSupport.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxServiceSupport.java?rev=1053342&r1=1053341&r2=1053342&view=diff
==============================================================================
--- 
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxServiceSupport.java
 (original)
+++ 
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxServiceSupport.java
 Tue Dec 28 13:10:59 2010
@@ -21,19 +21,24 @@ import java.util.concurrent.ExecutorServ
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.LoggingExceptionHandler;
 import org.apache.camel.impl.ServiceSupport;
+import org.apache.camel.spi.ExceptionHandler;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 public abstract class RouteboxServiceSupport extends ServiceSupport {
-    private static final transient Log LOG = 
LogFactory.getLog(RouteboxServiceSupport.class);
+    private final transient Log log = LogFactory.getLog(getClass());
+    private ExceptionHandler exceptionHandler;
     private RouteboxEndpoint endpoint;
     private ExecutorService executor;
-    private int pendingExchanges;
-    private boolean startedInnerContext;
-    
+    private volatile boolean startedInnerContext;
+
     public RouteboxServiceSupport(RouteboxEndpoint endpoint) {
         this.endpoint = endpoint;
+        if (exceptionHandler == null) {
+            exceptionHandler = new LoggingExceptionHandler(getClass());
+        }
     }
     
     protected void doStopInnerContext() throws Exception {
@@ -48,8 +53,8 @@ public abstract class RouteboxServiceSup
         List<RouteBuilder> routeBuildersList = 
endpoint.getConfig().getRouteBuilders();
         if (!(routeBuildersList.isEmpty())) {
             for (RouteBuilder routeBuilder : routeBuildersList) {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Adding routebuilder " + routeBuilder + " to " + 
context.getName());
+                if (log.isDebugEnabled()) {
+                    log.debug("Adding RouteBuilder " + routeBuilder + " to " + 
context.getName());
                 }
                 context.addRoutes(routeBuilder);
             }
@@ -59,14 +64,6 @@ public abstract class RouteboxServiceSup
         setStartedInnerContext(true);
     }
 
-    public void setPendingExchanges(int pendingExchanges) {
-        this.pendingExchanges = pendingExchanges;
-    }
-
-    public int getPendingExchanges() {
-        return pendingExchanges;
-    }
-
     public RouteboxEndpoint getRouteboxEndpoint() {
         return endpoint;
     }
@@ -83,14 +80,19 @@ public abstract class RouteboxServiceSup
         this.executor = executor;
     }
 
-
     public void setStartedInnerContext(boolean startedInnerContext) {
         this.startedInnerContext = startedInnerContext;
     }
 
-
     public boolean isStartedInnerContext() {
         return startedInnerContext;
     }
 
+    public void setExceptionHandler(ExceptionHandler exceptionHandler) {
+        this.exceptionHandler = exceptionHandler;
+    }
+
+    public ExceptionHandler getExceptionHandler() {
+        return exceptionHandler;
+    }
 }

Modified: 
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectConsumer.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectConsumer.java?rev=1053342&r1=1053341&r2=1053342&view=diff
==============================================================================
--- 
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectConsumer.java
 (original)
+++ 
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectConsumer.java
 Tue Dec 28 13:10:59 2010
@@ -17,24 +17,20 @@
 package org.apache.camel.component.routebox.direct;
 
 import org.apache.camel.AsyncProcessor;
-import org.apache.camel.Endpoint;
-import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.ProducerTemplate;
 import org.apache.camel.ShutdownRunningTask;
 import org.apache.camel.SuspendableService;
 import org.apache.camel.component.routebox.RouteboxConsumer;
+import org.apache.camel.component.routebox.RouteboxEndpoint;
 import org.apache.camel.component.routebox.RouteboxServiceSupport;
-import org.apache.camel.impl.LoggingExceptionHandler;
 import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
-import org.apache.camel.spi.ExceptionHandler;
 import org.apache.camel.spi.ShutdownAware;
 
 public class RouteboxDirectConsumer extends RouteboxServiceSupport implements 
RouteboxConsumer, ShutdownAware, SuspendableService {
     protected ProducerTemplate producer;
     private final Processor processor;
     private volatile AsyncProcessor asyncProcessor;
-    private ExceptionHandler exceptionHandler;
 
     public RouteboxDirectConsumer(RouteboxDirectEndpoint endpoint, Processor 
processor) {
         super(endpoint);
@@ -44,33 +40,31 @@ public class RouteboxDirectConsumer exte
     
     protected void doStart() throws Exception {
         // add consumer to endpoint
-        boolean existing = this == 
((RouteboxDirectEndpoint)getRouteboxEndpoint()).getConsumer();
-        if (!existing && 
((RouteboxDirectEndpoint)getRouteboxEndpoint()).hasConsumer(this)) {
-            throw new IllegalArgumentException("Cannot add a 2nd consumer to 
the same endpoint. Endpoint " + getRouteboxEndpoint() + " only allows one 
consumer.");
+        boolean existing = this == getEndpoint().getConsumer();
+        if (!existing && getEndpoint().hasConsumer(this)) {
+            throw new IllegalArgumentException("Cannot add a 2nd consumer to 
the same endpoint. Endpoint " + getEndpoint() + " only allows one consumer.");
         }
         if (!existing) {
-            ((RouteboxDirectEndpoint)getRouteboxEndpoint()).addConsumer(this);
+            getEndpoint().addConsumer(this);
         }
         
         // now start the inner context
         if (!isStartedInnerContext()) {
             doStartInnerContext(); 
         }
-        
     }
 
     protected void doStop() throws Exception {
-        ((RouteboxDirectEndpoint)getRouteboxEndpoint()).removeConsumer(this);
+        getEndpoint().removeConsumer(this);
         
         // now stop the inner context
         if (isStartedInnerContext()) {
             doStopInnerContext();
         }
-
     }
 
     protected void doSuspend() throws Exception {
-        ((RouteboxDirectEndpoint)getRouteboxEndpoint()).removeConsumer(this);
+        getEndpoint().removeConsumer(this);
     }
 
     protected void doResume() throws Exception {
@@ -78,11 +72,6 @@ public class RouteboxDirectConsumer exte
         doStart();
     }
     
-    public Exchange processRequest(Exchange exchange) {
-        return exchange;
-        
-    }
-    
     /**
      * Provides an {...@link org.apache.camel.AsyncProcessor} interface to the 
configured
      * processor on the consumer. If the processor does not implement the 
interface,
@@ -95,54 +84,24 @@ public class RouteboxDirectConsumer exte
         return asyncProcessor;
     }
 
-    public ExceptionHandler getExceptionHandler() {
-        if (exceptionHandler == null) {
-            exceptionHandler = new LoggingExceptionHandler(getClass());
-        }
-        return exceptionHandler;
-    }
-
-    public void setExceptionHandler(ExceptionHandler exceptionHandler) {
-        this.exceptionHandler = exceptionHandler;
-    }
-
-    /**
-     * Handles the given exception using the {...@link #getExceptionHandler()}
-     * 
-     * @param t the exception to handle
-     */
-    protected void handleException(Throwable t) {
-        Throwable newt = (t == null) ? new IllegalArgumentException("Handling 
[null] exception") : t;
-        getExceptionHandler().handleException(newt);
-    }
-
-    /* (non-Javadoc)
-     * @see 
org.apache.camel.spi.ShutdownAware#deferShutdown(org.apache.camel.ShutdownRunningTask)
-     */
     public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {
         // deny stopping on shutdown as we want direct consumers to run in 
case some other queues
         // depend on this consumer to run, so it can complete its exchanges
         return true;
     }
 
-    /* (non-Javadoc)
-     * @see org.apache.camel.spi.ShutdownAware#getPendingExchangesSize()
-     */
     public int getPendingExchangesSize() {
         // return 0 as we do not have an internal memory queue with a variable 
size
         // of inflight messages. 
         return 0;
     }
 
-    /* (non-Javadoc)
-     * @see org.apache.camel.spi.ShutdownAware#prepareShutdown()
-     */
     public void prepareShutdown() {
-        
+        // noop
     }
     
-    public Endpoint getEndpoint() {
-        return (Endpoint) getRouteboxEndpoint();
+    public RouteboxDirectEndpoint getEndpoint() {
+        return (RouteboxDirectEndpoint) getRouteboxEndpoint();
     }
 
     public Processor getProcessor() {

Modified: 
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectEndpoint.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectEndpoint.java?rev=1053342&r1=1053341&r2=1053342&view=diff
==============================================================================
--- 
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectEndpoint.java
 (original)
+++ 
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectEndpoint.java
 Tue Dec 28 13:10:59 2010
@@ -27,7 +27,7 @@ import org.apache.camel.component.routeb
 import org.apache.camel.component.routebox.RouteboxEndpoint;
 
 public class RouteboxDirectEndpoint extends RouteboxEndpoint {
-    private volatile Map<String, RouteboxDirectConsumer> consumers = new 
HashMap<String, RouteboxDirectConsumer>();
+    private final Map<String, RouteboxDirectConsumer> consumers = new 
HashMap<String, RouteboxDirectConsumer>();
 
     public RouteboxDirectEndpoint(String endpointUri) {
         super(endpointUri);

Modified: 
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectProducer.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectProducer.java?rev=1053342&r1=1053341&r2=1053342&view=diff
==============================================================================
--- 
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectProducer.java
 (original)
+++ 
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectProducer.java
 Tue Dec 28 13:10:59 2010
@@ -26,9 +26,7 @@ import org.apache.camel.Producer;
 import org.apache.camel.ProducerTemplate;
 import org.apache.camel.component.routebox.RouteboxServiceSupport;
 import org.apache.camel.component.routebox.strategy.RouteboxDispatcher;
-import org.apache.camel.impl.LoggingExceptionHandler;
 import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
-import org.apache.camel.spi.ExceptionHandler;
 import org.apache.camel.util.AsyncProcessorHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -36,21 +34,20 @@ import org.apache.commons.logging.LogFac
 public class RouteboxDirectProducer extends RouteboxServiceSupport implements 
Producer, AsyncProcessor {
     private static final transient Log LOG = 
LogFactory.getLog(RouteboxDirectProducer.class);
     protected ProducerTemplate producer;
-    private ExceptionHandler exceptionHandler;
-    
+
     public RouteboxDirectProducer(RouteboxDirectEndpoint endpoint) {
         super(endpoint);
         producer = endpoint.getConfig().getInnerProducerTemplate();
     }
 
     public void process(Exchange exchange) throws Exception {
-        Exchange result = null;
+        Exchange result;
         
         if ((((RouteboxDirectEndpoint)getRouteboxEndpoint()).getConsumer() == 
null) && (getRouteboxEndpoint().getConfig().isSendToConsumer())) {
             throw new CamelExchangeException("No consumers available on 
endpoint: " + getRouteboxEndpoint(), exchange);
         } else {
             if (LOG.isDebugEnabled()) {
-                LOG.debug("**** Dispatching to Inner Route ****");
+                LOG.debug("Dispatching to Inner Route " + exchange);
             }
             RouteboxDispatcher dispatcher = new RouteboxDispatcher(producer);
             result = dispatcher.dispatchSync(getRouteboxEndpoint(), exchange);
@@ -64,14 +61,14 @@ public class RouteboxDirectProducer exte
         boolean flag = true;
         
         if ((((RouteboxDirectEndpoint)getRouteboxEndpoint()).getConsumer() == 
null) 
-            && 
(((RouteboxDirectEndpoint)getRouteboxEndpoint()).getConfig().isSendToConsumer()))
 {
+            && ((getRouteboxEndpoint()).getConfig().isSendToConsumer())) {
             exchange.setException(new CamelExchangeException("No consumers 
available on endpoint: " + getRouteboxEndpoint(), exchange));
             callback.done(true);
             flag = true;
         } else {
             try {
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug("**** Dispatching to Inner Route ****");
+                    LOG.debug("Dispatching to Inner Route " + exchange);
                 }
                 
                 RouteboxDispatcher dispatcher = new 
RouteboxDispatcher(producer);
@@ -97,70 +94,46 @@ public class RouteboxDirectProducer exte
     }
     
     protected void doStart() throws Exception {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Starting producer: " + this);
-        }
-        
-        if 
(!((RouteboxDirectEndpoint)getRouteboxEndpoint()).getConfig().isSendToConsumer())
 {
+        if (!(getRouteboxEndpoint()).getConfig().isSendToConsumer()) {
             // start an inner context
             if (!isStartedInnerContext()) {
                 doStartInnerContext(); 
             }
         }
-        
     }
 
     protected void doStop() throws Exception {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Stopping producer: " + this);
-        }
-        
-        if 
(!((RouteboxDirectEndpoint)getRouteboxEndpoint()).getConfig().isSendToConsumer())
 {        
+        if (!(getRouteboxEndpoint()).getConfig().isSendToConsumer()) {
             // stop the inner context
             if (isStartedInnerContext()) {
                 doStopInnerContext();
             }
         }
-
     }
     
-    @Override
-    public String toString() {
-        return "Producer[" + getRouteboxEndpoint()
-                .getEndpointUri() + "]";
-    }
-
     public Endpoint getEndpoint() {
         return getRouteboxEndpoint();
     }
 
     public Exchange createExchange() {
-        return getRouteboxEndpoint()
-                .createExchange();
+        return getRouteboxEndpoint().createExchange();
     }
 
     public Exchange createExchange(ExchangePattern pattern) {
-        return getRouteboxEndpoint()
-                .createExchange(pattern);
+        return getRouteboxEndpoint().createExchange(pattern);
     }
 
     public Exchange createExchange(Exchange exchange) {
-        return getRouteboxEndpoint()
-                .createExchange(exchange);
+        return getRouteboxEndpoint().createExchange(exchange);
     }
 
     public boolean isSingleton() {
         return true;
     }
 
-    public ExceptionHandler getExceptionHandler() {
-        if (exceptionHandler == null) {
-            exceptionHandler = new LoggingExceptionHandler(getClass());
-        }
-        return exceptionHandler;
+    @Override
+    public String toString() {
+        return "Producer[" + getRouteboxEndpoint().getEndpointUri() + "]";
     }
 
-    public void setExceptionHandler(ExceptionHandler exceptionHandler) {
-        this.exceptionHandler = exceptionHandler;
-    }
 }

Modified: 
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaConsumer.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaConsumer.java?rev=1053342&r1=1053341&r2=1053342&view=diff
==============================================================================
--- 
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaConsumer.java
 (original)
+++ 
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaConsumer.java
 Tue Dec 28 13:10:59 2010
@@ -41,23 +41,13 @@ public class RouteboxSedaConsumer extend
     private static final transient Log LOG = 
LogFactory.getLog(RouteboxSedaConsumer.class);
     protected AsyncProcessor processor;
     protected ProducerTemplate producer;
-    private int pendingExchanges;
-    private ExceptionHandler exceptionHandler;
-    
+
     public RouteboxSedaConsumer(RouteboxSedaEndpoint endpoint, Processor 
processor) {
         super(endpoint);
         this.setProcessor(AsyncProcessorTypeConverter.convert(processor));
-        producer = endpoint.getConfig().getInnerProducerTemplate();
-        producer.setMaximumCacheSize(endpoint.getConfig().getThreads());
-        if (exceptionHandler == null) {
-            exceptionHandler = new LoggingExceptionHandler(getClass());
-        }
+        this.producer = endpoint.getConfig().getInnerProducerTemplate();
     }
 
-
-    /* (non-Javadoc)
-     * @see org.apache.camel.impl.ServiceSupport#doStart()
-     */
     @Override
     protected void doStart() throws Exception {
         ((RouteboxSedaEndpoint)getRouteboxEndpoint()).onStarted(this);
@@ -65,30 +55,24 @@ public class RouteboxSedaConsumer extend
         
         // Create a URI link from the primary context to routes in the new 
inner context
         int poolSize = getRouteboxEndpoint().getConfig().getThreads();
-        
setExecutor(((RouteboxSedaEndpoint)getRouteboxEndpoint()).getCamelContext().getExecutorServiceStrategy()
-                        .newFixedThreadPool(this, 
((RouteboxSedaEndpoint)getRouteboxEndpoint()).getEndpointUri(), poolSize));
+        
setExecutor(getRouteboxEndpoint().getCamelContext().getExecutorServiceStrategy()
+                .newFixedThreadPool(this, 
getRouteboxEndpoint().getEndpointUri(), poolSize));
         for (int i = 0; i < poolSize; i++) {
-            getExecutor().execute((Runnable) this);
+            getExecutor().execute(this);
         }
     }
 
-    /* (non-Javadoc)
-     * @see org.apache.camel.impl.ServiceSupport#doStop()
-     */
     @Override
     protected void doStop() throws Exception {
         ((RouteboxSedaEndpoint)getRouteboxEndpoint()).onStopped(this);
         // Shutdown the executor
-        
((RouteboxSedaEndpoint)getRouteboxEndpoint()).getCamelContext().getExecutorServiceStrategy().shutdown(getExecutor());
+        
getRouteboxEndpoint().getCamelContext().getExecutorServiceStrategy().shutdown(getExecutor());
         setExecutor(null);
         
         doStopInnerContext(); 
     }
     
-    /* (non-Javadoc)
-     * @see java.lang.Runnable#run()
-     */
-    public void run() {       
+    public void run() {
         BlockingQueue<Exchange> queue = 
((RouteboxSedaEndpoint)getRouteboxEndpoint()).getQueue();
         while (queue != null && isRunAllowed()) {
             try {
@@ -104,13 +88,13 @@ public class RouteboxSedaConsumer extend
     }
     
     private void dispatchToInnerRoute(BlockingQueue<Exchange> queue, final 
Exchange exchange) throws InterruptedException {
-        Exchange result = null;
+        Exchange result;
 
         if (exchange != null) {
             if (isRunAllowed()) {
                 try {
                     if (LOG.isDebugEnabled()) {
-                        LOG.debug("**** Dispatching to Inner Route ****");
+                        LOG.debug("Dispatching to inner route: " + exchange);
                     }
                     RouteboxDispatcher dispatcher = new 
RouteboxDispatcher(producer);
                     result = dispatcher.dispatchAsync(getRouteboxEndpoint(), 
exchange); 
@@ -131,33 +115,20 @@ public class RouteboxSedaConsumer extend
         }
     }
     
-    
-    /* (non-Javadoc)
-     * @see org.apache.camel.Consumer#getEndpoint()
-     */
     public Endpoint getEndpoint() {
-        return (Endpoint) getRouteboxEndpoint();
+        return getRouteboxEndpoint();
     }
 
-    /* (non-Javadoc)
-     * @see 
org.apache.camel.spi.ShutdownAware#deferShutdown(org.apache.camel.ShutdownRunningTask)
-     */
     public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {
         return false;
     }
 
-    /* (non-Javadoc)
-     * @see org.apache.camel.spi.ShutdownAware#getPendingExchangesSize()
-     */
     public int getPendingExchangesSize() {
-        return getPendingExchanges();
+        // TODO: Get size of queue
+        return 0;
     }
     
-    /* (non-Javadoc)
-     * @see org.apache.camel.spi.ShutdownAware#prepareShutdown()
-     */
     public void prepareShutdown() {
-        
     }
     
     public void setProcessor(AsyncProcessor processor) {
@@ -168,20 +139,4 @@ public class RouteboxSedaConsumer extend
         return processor;
     }
 
-    public void setPendingExchanges(int pendingExchanges) {
-        this.pendingExchanges = pendingExchanges;
-    }
-
-    public int getPendingExchanges() {
-        return pendingExchanges;
-    }
-
-    public void setExceptionHandler(ExceptionHandler exceptionHandler) {
-        this.exceptionHandler = exceptionHandler;
-    }
-
-    public ExceptionHandler getExceptionHandler() {
-        return exceptionHandler;
-    }
-    
 }

Modified: 
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaEndpoint.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaEndpoint.java?rev=1053342&r1=1053341&r2=1053342&view=diff
==============================================================================
--- 
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaEndpoint.java
 (original)
+++ 
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaEndpoint.java
 Tue Dec 28 13:10:59 2010
@@ -111,16 +111,10 @@ public class RouteboxSedaEndpoint extend
         return queue;
     }
 
-    /* (non-Javadoc)
-     * @see 
org.apache.camel.MultipleConsumersSupport#isMultipleConsumersSupported()
-     */
     public boolean isMultipleConsumersSupported() {
         return true;
     }
 
-    /* (non-Javadoc)
-     * @see org.apache.camel.spi.BrowsableEndpoint#getExchanges()
-     */
     public List<Exchange> getExchanges() {
         return new ArrayList<Exchange>(getQueue());
     }

Modified: 
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/strategy/RouteboxDispatchStrategy.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/strategy/RouteboxDispatchStrategy.java?rev=1053342&r1=1053341&r2=1053342&view=diff
==============================================================================
--- 
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/strategy/RouteboxDispatchStrategy.java
 (original)
+++ 
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/strategy/RouteboxDispatchStrategy.java
 Tue Dec 28 13:10:59 2010
@@ -25,13 +25,15 @@ import org.apache.camel.Exchange;
  * A strategy for identifying the route consumer in the routebox where the 
exchange should to be dispatched
  */
 public interface RouteboxDispatchStrategy {
+
     /**
      * Receives an incoming exchange and consumer list and identifies the 
inner route consumer for dispatching the exchange
      *
-     * @param innerRouteConsumers the list of possible real-time inner route 
consumers available 
+     * @param destinations the list of possible real-time inner route 
consumers available
      *        to where the exchange can be dispatched in the routebox
      * @param exchange the incoming exchange
      * @return a selected consumer to whom the exchange can be directed
+     * @throws Exception is thrown if error
      */
     URI selectDestinationUri(List<URI> destinations, Exchange exchange) throws 
Exception;
 } 

Modified: 
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/strategy/RouteboxDispatcher.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/strategy/RouteboxDispatcher.java?rev=1053342&r1=1053341&r2=1053342&view=diff
==============================================================================
--- 
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/strategy/RouteboxDispatcher.java
 (original)
+++ 
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/strategy/RouteboxDispatcher.java
 Tue Dec 28 13:10:59 2010
@@ -25,7 +25,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.CamelContext;
-import org.apache.camel.CamelException;
+import org.apache.camel.CamelExchangeException;
 import org.apache.camel.CamelExecutionException;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
@@ -40,7 +40,6 @@ import org.apache.camel.model.RouteDefin
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-
 public class RouteboxDispatcher {
     private static final transient Log LOG = 
LogFactory.getLog(RouteboxDispatcher.class);
     private ProducerTemplate producer;
@@ -51,11 +50,11 @@ public class RouteboxDispatcher {
     }
 
     public Exchange dispatchSync(RouteboxEndpoint endpoint, Exchange exchange) 
throws Exception {
-        URI dispatchUri = null;
-        Exchange reply = null;
+        URI dispatchUri;
+        Exchange reply;
         
         if (LOG.isDebugEnabled()) {
-            LOG.debug("Dispatching exchange" + exchange + "to endpoint " + 
endpoint.getEndpointUri());
+            LOG.debug("Dispatching exchange " + exchange + " to endpoint " + 
endpoint.getEndpointUri());
         }
         
         dispatchUri = selectDispatchUri(endpoint, exchange);
@@ -63,18 +62,18 @@ public class RouteboxDispatcher {
         if (exchange.getPattern() == ExchangePattern.InOnly) {
             reply = producer.send(dispatchUri.toASCIIString(), exchange);
         } else {
-            reply = (Exchange) issueRequest(endpoint, ExchangePattern.InOut, 
exchange.getIn().getBody(), exchange.getIn().getHeaders());        
+            reply = issueRequest(endpoint, ExchangePattern.InOut, 
exchange.getIn().getBody(), exchange.getIn().getHeaders());
         }
 
         return reply;
     }
     
     public Exchange dispatchAsync(RouteboxEndpoint endpoint, Exchange 
exchange) throws Exception {
-        URI dispatchUri = null;
-        Exchange reply = null;
+        URI dispatchUri;
+        Exchange reply;
 
         if (LOG.isDebugEnabled()) {
-            LOG.debug("Dispatching exchange" + exchange + "to endpoint " + 
endpoint.getEndpointUri());
+            LOG.debug("Dispatching exchange " + exchange + " to endpoint " + 
endpoint.getEndpointUri());
         }
         
         dispatchUri = selectDispatchUri(endpoint, exchange);
@@ -91,26 +90,27 @@ public class RouteboxDispatcher {
     }
     
     protected URI selectDispatchUri(RouteboxEndpoint endpoint, Exchange 
exchange) throws Exception {
-        URI dispatchUri = null;
+        URI dispatchUri;
         
         List<URI> consumerUris = 
getInnerContextConsumerList(endpoint.getConfig().getInnerContext());
         if (consumerUris.isEmpty()) {
-            throw new CamelException("No routes found for dispatch in 
Routebox");
+            throw new CamelExchangeException("No routes found to dispatch in 
Routebox at " + endpoint, exchange);
         } else if (consumerUris.size() == 1) {
             dispatchUri = consumerUris.get(0);
         } else {
             if (!endpoint.getConfig().getDispatchMap().isEmpty()) {
-                //apply URI string found in dispatch Map
-                if 
(endpoint.getConfig().getDispatchMap().containsKey(exchange.getIn().getHeader("ROUTE_DISPATCH_KEY")))
 {             
-                    dispatchUri = new 
URI(endpoint.getConfig().getDispatchMap().get(exchange.getIn().getHeader("ROUTE_DISPATCH_KEY")));
+                // apply URI string found in dispatch Map
+                String key = exchange.getIn().getHeader("ROUTE_DISPATCH_KEY", 
String.class);
+                if (endpoint.getConfig().getDispatchMap().containsKey(key)) {
+                    dispatchUri = new 
URI(endpoint.getConfig().getDispatchMap().get(key));
                 } else {
-                    throw new CamelException("No matching entry found in 
Dispatch Map for ROUTE_DISPATCH_KEY: " + 
exchange.getIn().getHeader("ROUTE_DISPATCH_KEY"));
+                    throw new CamelExchangeException("No matching entry found 
in Dispatch Map for ROUTE_DISPATCH_KEY: " + key, exchange);
                 }
             } else {
-                //apply dispatch strategy
+                // apply dispatch strategy
                 dispatchUri = 
endpoint.getConfig().getDispatchStrategy().selectDestinationUri(consumerUris, 
exchange);
                 if (dispatchUri == null) {
-                    throw new CamelException("No matching inner routes found 
for Operation");
+                    throw new CamelExchangeException("No matching inner routes 
found for Operation", exchange);
                 }
             }
         }
@@ -138,9 +138,7 @@ public class RouteboxDispatcher {
         Exchange exchange = producer.send(endpoint, pattern, new Processor() {
             public void process(Exchange exchange) throws Exception {
                 Message in = exchange.getIn();
-                for (Map.Entry<String, Object> header : headers.entrySet()) {
-                    in.setHeader(header.getKey(), header.getValue());
-                }
+                in.getHeaders().putAll(headers);
                 in.setBody(body);
             }
         });


Reply via email to