Author: davsclaus Date: Tue Dec 28 13:10:59 2010 New Revision: 1053342 URL: http://svn.apache.org/viewvc?rev=1053342&view=rev Log: CAMEL-3285: Polished code a bit due code review.
Modified: camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxComponent.java camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxConfiguration.java camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxServiceSupport.java camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectConsumer.java camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectEndpoint.java camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectProducer.java camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaConsumer.java camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaEndpoint.java camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/strategy/RouteboxDispatchStrategy.java camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/strategy/RouteboxDispatcher.java Modified: camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxComponent.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxComponent.java?rev=1053342&r1=1053341&r2=1053342&view=diff ============================================================================== --- camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxComponent.java (original) +++ camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxComponent.java Tue Dec 28 13:10:59 2010 @@ -30,7 +30,7 @@ import org.apache.camel.component.routeb import org.apache.camel.impl.DefaultComponent; public class RouteboxComponent extends DefaultComponent { - RouteboxConfiguration config; + final RouteboxConfiguration config; private final Map<String, BlockingQueue<Exchange>> queues = new HashMap<String, BlockingQueue<Exchange>>(); public RouteboxComponent() { Modified: camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxConfiguration.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxConfiguration.java?rev=1053342&r1=1053341&r2=1053342&view=diff ============================================================================== --- camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxConfiguration.java (original) +++ camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxConfiguration.java Tue Dec 28 13:10:59 2010 @@ -27,7 +27,6 @@ import org.apache.camel.ProducerTemplate import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.routebox.strategy.RouteboxDispatchStrategy; import org.apache.camel.impl.DefaultCamelContext; -import org.apache.camel.impl.DefaultProducerTemplate; import org.apache.camel.spi.Registry; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -57,7 +56,7 @@ public class RouteboxConfiguration { public RouteboxConfiguration() { } - public RouteboxConfiguration(URI uri) throws Exception { + public RouteboxConfiguration(URI uri) { this(); this.uri = uri; } @@ -71,7 +70,9 @@ public class RouteboxConfiguration { setUri(uri); setAuthority(uri.getAuthority()); - LOG.info("Authority: " + uri.getAuthority()); + if (LOG.isTraceEnabled()) { + LOG.trace("Authority: " + uri.getAuthority()); + } setEndpointName(getAuthority()); @@ -113,31 +114,28 @@ public class RouteboxConfiguration { } if (parameters.containsKey("innerRegistry")) { - innerRegistry = (Registry) component.resolveAndRemoveReferenceParameter(parameters, "innerRegistry", Registry.class); + innerRegistry = component.resolveAndRemoveReferenceParameter(parameters, "innerRegistry", Registry.class); } if (isForkContext()) { if (innerRegistry != null) { - innerContext = (CamelContext) component.resolveAndRemoveReferenceParameter(parameters, "innerContext", CamelContext.class, new DefaultCamelContext(innerRegistry)); + innerContext = component.resolveAndRemoveReferenceParameter(parameters, "innerContext", CamelContext.class, new DefaultCamelContext(innerRegistry)); } else { - innerContext = (CamelContext) component.resolveAndRemoveReferenceParameter(parameters, "innerContext", CamelContext.class, new DefaultCamelContext()); + innerContext = component.resolveAndRemoveReferenceParameter(parameters, "innerContext", CamelContext.class, new DefaultCamelContext()); } - } else { innerContext = component.getCamelContext(); } - //configureInnerContext(); - innerProducerTemplate = new DefaultProducerTemplate(innerContext); - innerProducerTemplate.start(); + innerProducerTemplate = innerContext.createProducerTemplate(); setQueueSize(component.getAndRemoveParameter(parameters, "size", Integer.class, 0)); consumerUri = component.resolveAndRemoveReferenceParameter(parameters, "consumerUri", URI.class, new URI("routebox:" + getEndpointName())); producerUri = component.resolveAndRemoveReferenceParameter(parameters, "producerUri", URI.class, new URI("routebox:" + getEndpointName())); dispatchStrategy = component.resolveAndRemoveReferenceParameter(parameters, "dispatchStrategy", RouteboxDispatchStrategy.class, null); dispatchMap = (HashMap<String, String>) component.resolveAndRemoveReferenceParameter(parameters, "dispatchMap", HashMap.class, new HashMap<String, String>()); - if ((dispatchStrategy == null) && (dispatchMap == null)) { - LOG.warn("No Routebox Dispatch Map or Strategy has been set. Routebox may not have more than one inner route"); + if (dispatchStrategy == null && dispatchMap == null) { + LOG.warn("No Routebox Dispatch Map or Strategy has been set. Routebox may not have more than one inner route."); } } Modified: camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxServiceSupport.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxServiceSupport.java?rev=1053342&r1=1053341&r2=1053342&view=diff ============================================================================== --- camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxServiceSupport.java (original) +++ camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxServiceSupport.java Tue Dec 28 13:10:59 2010 @@ -21,19 +21,24 @@ import java.util.concurrent.ExecutorServ import org.apache.camel.CamelContext; import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.impl.LoggingExceptionHandler; import org.apache.camel.impl.ServiceSupport; +import org.apache.camel.spi.ExceptionHandler; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; public abstract class RouteboxServiceSupport extends ServiceSupport { - private static final transient Log LOG = LogFactory.getLog(RouteboxServiceSupport.class); + private final transient Log log = LogFactory.getLog(getClass()); + private ExceptionHandler exceptionHandler; private RouteboxEndpoint endpoint; private ExecutorService executor; - private int pendingExchanges; - private boolean startedInnerContext; - + private volatile boolean startedInnerContext; + public RouteboxServiceSupport(RouteboxEndpoint endpoint) { this.endpoint = endpoint; + if (exceptionHandler == null) { + exceptionHandler = new LoggingExceptionHandler(getClass()); + } } protected void doStopInnerContext() throws Exception { @@ -48,8 +53,8 @@ public abstract class RouteboxServiceSup List<RouteBuilder> routeBuildersList = endpoint.getConfig().getRouteBuilders(); if (!(routeBuildersList.isEmpty())) { for (RouteBuilder routeBuilder : routeBuildersList) { - if (LOG.isDebugEnabled()) { - LOG.debug("Adding routebuilder " + routeBuilder + " to " + context.getName()); + if (log.isDebugEnabled()) { + log.debug("Adding RouteBuilder " + routeBuilder + " to " + context.getName()); } context.addRoutes(routeBuilder); } @@ -59,14 +64,6 @@ public abstract class RouteboxServiceSup setStartedInnerContext(true); } - public void setPendingExchanges(int pendingExchanges) { - this.pendingExchanges = pendingExchanges; - } - - public int getPendingExchanges() { - return pendingExchanges; - } - public RouteboxEndpoint getRouteboxEndpoint() { return endpoint; } @@ -83,14 +80,19 @@ public abstract class RouteboxServiceSup this.executor = executor; } - public void setStartedInnerContext(boolean startedInnerContext) { this.startedInnerContext = startedInnerContext; } - public boolean isStartedInnerContext() { return startedInnerContext; } + public void setExceptionHandler(ExceptionHandler exceptionHandler) { + this.exceptionHandler = exceptionHandler; + } + + public ExceptionHandler getExceptionHandler() { + return exceptionHandler; + } } Modified: camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectConsumer.java?rev=1053342&r1=1053341&r2=1053342&view=diff ============================================================================== --- camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectConsumer.java (original) +++ camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectConsumer.java Tue Dec 28 13:10:59 2010 @@ -17,24 +17,20 @@ package org.apache.camel.component.routebox.direct; import org.apache.camel.AsyncProcessor; -import org.apache.camel.Endpoint; -import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.ProducerTemplate; import org.apache.camel.ShutdownRunningTask; import org.apache.camel.SuspendableService; import org.apache.camel.component.routebox.RouteboxConsumer; +import org.apache.camel.component.routebox.RouteboxEndpoint; import org.apache.camel.component.routebox.RouteboxServiceSupport; -import org.apache.camel.impl.LoggingExceptionHandler; import org.apache.camel.impl.converter.AsyncProcessorTypeConverter; -import org.apache.camel.spi.ExceptionHandler; import org.apache.camel.spi.ShutdownAware; public class RouteboxDirectConsumer extends RouteboxServiceSupport implements RouteboxConsumer, ShutdownAware, SuspendableService { protected ProducerTemplate producer; private final Processor processor; private volatile AsyncProcessor asyncProcessor; - private ExceptionHandler exceptionHandler; public RouteboxDirectConsumer(RouteboxDirectEndpoint endpoint, Processor processor) { super(endpoint); @@ -44,33 +40,31 @@ public class RouteboxDirectConsumer exte protected void doStart() throws Exception { // add consumer to endpoint - boolean existing = this == ((RouteboxDirectEndpoint)getRouteboxEndpoint()).getConsumer(); - if (!existing && ((RouteboxDirectEndpoint)getRouteboxEndpoint()).hasConsumer(this)) { - throw new IllegalArgumentException("Cannot add a 2nd consumer to the same endpoint. Endpoint " + getRouteboxEndpoint() + " only allows one consumer."); + boolean existing = this == getEndpoint().getConsumer(); + if (!existing && getEndpoint().hasConsumer(this)) { + throw new IllegalArgumentException("Cannot add a 2nd consumer to the same endpoint. Endpoint " + getEndpoint() + " only allows one consumer."); } if (!existing) { - ((RouteboxDirectEndpoint)getRouteboxEndpoint()).addConsumer(this); + getEndpoint().addConsumer(this); } // now start the inner context if (!isStartedInnerContext()) { doStartInnerContext(); } - } protected void doStop() throws Exception { - ((RouteboxDirectEndpoint)getRouteboxEndpoint()).removeConsumer(this); + getEndpoint().removeConsumer(this); // now stop the inner context if (isStartedInnerContext()) { doStopInnerContext(); } - } protected void doSuspend() throws Exception { - ((RouteboxDirectEndpoint)getRouteboxEndpoint()).removeConsumer(this); + getEndpoint().removeConsumer(this); } protected void doResume() throws Exception { @@ -78,11 +72,6 @@ public class RouteboxDirectConsumer exte doStart(); } - public Exchange processRequest(Exchange exchange) { - return exchange; - - } - /** * Provides an {...@link org.apache.camel.AsyncProcessor} interface to the configured * processor on the consumer. If the processor does not implement the interface, @@ -95,54 +84,24 @@ public class RouteboxDirectConsumer exte return asyncProcessor; } - public ExceptionHandler getExceptionHandler() { - if (exceptionHandler == null) { - exceptionHandler = new LoggingExceptionHandler(getClass()); - } - return exceptionHandler; - } - - public void setExceptionHandler(ExceptionHandler exceptionHandler) { - this.exceptionHandler = exceptionHandler; - } - - /** - * Handles the given exception using the {...@link #getExceptionHandler()} - * - * @param t the exception to handle - */ - protected void handleException(Throwable t) { - Throwable newt = (t == null) ? new IllegalArgumentException("Handling [null] exception") : t; - getExceptionHandler().handleException(newt); - } - - /* (non-Javadoc) - * @see org.apache.camel.spi.ShutdownAware#deferShutdown(org.apache.camel.ShutdownRunningTask) - */ public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) { // deny stopping on shutdown as we want direct consumers to run in case some other queues // depend on this consumer to run, so it can complete its exchanges return true; } - /* (non-Javadoc) - * @see org.apache.camel.spi.ShutdownAware#getPendingExchangesSize() - */ public int getPendingExchangesSize() { // return 0 as we do not have an internal memory queue with a variable size // of inflight messages. return 0; } - /* (non-Javadoc) - * @see org.apache.camel.spi.ShutdownAware#prepareShutdown() - */ public void prepareShutdown() { - + // noop } - public Endpoint getEndpoint() { - return (Endpoint) getRouteboxEndpoint(); + public RouteboxDirectEndpoint getEndpoint() { + return (RouteboxDirectEndpoint) getRouteboxEndpoint(); } public Processor getProcessor() { Modified: camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectEndpoint.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectEndpoint.java?rev=1053342&r1=1053341&r2=1053342&view=diff ============================================================================== --- camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectEndpoint.java (original) +++ camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectEndpoint.java Tue Dec 28 13:10:59 2010 @@ -27,7 +27,7 @@ import org.apache.camel.component.routeb import org.apache.camel.component.routebox.RouteboxEndpoint; public class RouteboxDirectEndpoint extends RouteboxEndpoint { - private volatile Map<String, RouteboxDirectConsumer> consumers = new HashMap<String, RouteboxDirectConsumer>(); + private final Map<String, RouteboxDirectConsumer> consumers = new HashMap<String, RouteboxDirectConsumer>(); public RouteboxDirectEndpoint(String endpointUri) { super(endpointUri); Modified: camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectProducer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectProducer.java?rev=1053342&r1=1053341&r2=1053342&view=diff ============================================================================== --- camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectProducer.java (original) +++ camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectProducer.java Tue Dec 28 13:10:59 2010 @@ -26,9 +26,7 @@ import org.apache.camel.Producer; import org.apache.camel.ProducerTemplate; import org.apache.camel.component.routebox.RouteboxServiceSupport; import org.apache.camel.component.routebox.strategy.RouteboxDispatcher; -import org.apache.camel.impl.LoggingExceptionHandler; import org.apache.camel.impl.converter.AsyncProcessorTypeConverter; -import org.apache.camel.spi.ExceptionHandler; import org.apache.camel.util.AsyncProcessorHelper; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -36,21 +34,20 @@ import org.apache.commons.logging.LogFac public class RouteboxDirectProducer extends RouteboxServiceSupport implements Producer, AsyncProcessor { private static final transient Log LOG = LogFactory.getLog(RouteboxDirectProducer.class); protected ProducerTemplate producer; - private ExceptionHandler exceptionHandler; - + public RouteboxDirectProducer(RouteboxDirectEndpoint endpoint) { super(endpoint); producer = endpoint.getConfig().getInnerProducerTemplate(); } public void process(Exchange exchange) throws Exception { - Exchange result = null; + Exchange result; if ((((RouteboxDirectEndpoint)getRouteboxEndpoint()).getConsumer() == null) && (getRouteboxEndpoint().getConfig().isSendToConsumer())) { throw new CamelExchangeException("No consumers available on endpoint: " + getRouteboxEndpoint(), exchange); } else { if (LOG.isDebugEnabled()) { - LOG.debug("**** Dispatching to Inner Route ****"); + LOG.debug("Dispatching to Inner Route " + exchange); } RouteboxDispatcher dispatcher = new RouteboxDispatcher(producer); result = dispatcher.dispatchSync(getRouteboxEndpoint(), exchange); @@ -64,14 +61,14 @@ public class RouteboxDirectProducer exte boolean flag = true; if ((((RouteboxDirectEndpoint)getRouteboxEndpoint()).getConsumer() == null) - && (((RouteboxDirectEndpoint)getRouteboxEndpoint()).getConfig().isSendToConsumer())) { + && ((getRouteboxEndpoint()).getConfig().isSendToConsumer())) { exchange.setException(new CamelExchangeException("No consumers available on endpoint: " + getRouteboxEndpoint(), exchange)); callback.done(true); flag = true; } else { try { if (LOG.isDebugEnabled()) { - LOG.debug("**** Dispatching to Inner Route ****"); + LOG.debug("Dispatching to Inner Route " + exchange); } RouteboxDispatcher dispatcher = new RouteboxDispatcher(producer); @@ -97,70 +94,46 @@ public class RouteboxDirectProducer exte } protected void doStart() throws Exception { - if (LOG.isDebugEnabled()) { - LOG.debug("Starting producer: " + this); - } - - if (!((RouteboxDirectEndpoint)getRouteboxEndpoint()).getConfig().isSendToConsumer()) { + if (!(getRouteboxEndpoint()).getConfig().isSendToConsumer()) { // start an inner context if (!isStartedInnerContext()) { doStartInnerContext(); } } - } protected void doStop() throws Exception { - if (LOG.isDebugEnabled()) { - LOG.debug("Stopping producer: " + this); - } - - if (!((RouteboxDirectEndpoint)getRouteboxEndpoint()).getConfig().isSendToConsumer()) { + if (!(getRouteboxEndpoint()).getConfig().isSendToConsumer()) { // stop the inner context if (isStartedInnerContext()) { doStopInnerContext(); } } - } - @Override - public String toString() { - return "Producer[" + getRouteboxEndpoint() - .getEndpointUri() + "]"; - } - public Endpoint getEndpoint() { return getRouteboxEndpoint(); } public Exchange createExchange() { - return getRouteboxEndpoint() - .createExchange(); + return getRouteboxEndpoint().createExchange(); } public Exchange createExchange(ExchangePattern pattern) { - return getRouteboxEndpoint() - .createExchange(pattern); + return getRouteboxEndpoint().createExchange(pattern); } public Exchange createExchange(Exchange exchange) { - return getRouteboxEndpoint() - .createExchange(exchange); + return getRouteboxEndpoint().createExchange(exchange); } public boolean isSingleton() { return true; } - public ExceptionHandler getExceptionHandler() { - if (exceptionHandler == null) { - exceptionHandler = new LoggingExceptionHandler(getClass()); - } - return exceptionHandler; + @Override + public String toString() { + return "Producer[" + getRouteboxEndpoint().getEndpointUri() + "]"; } - public void setExceptionHandler(ExceptionHandler exceptionHandler) { - this.exceptionHandler = exceptionHandler; - } } Modified: camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaConsumer.java?rev=1053342&r1=1053341&r2=1053342&view=diff ============================================================================== --- camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaConsumer.java (original) +++ camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaConsumer.java Tue Dec 28 13:10:59 2010 @@ -41,23 +41,13 @@ public class RouteboxSedaConsumer extend private static final transient Log LOG = LogFactory.getLog(RouteboxSedaConsumer.class); protected AsyncProcessor processor; protected ProducerTemplate producer; - private int pendingExchanges; - private ExceptionHandler exceptionHandler; - + public RouteboxSedaConsumer(RouteboxSedaEndpoint endpoint, Processor processor) { super(endpoint); this.setProcessor(AsyncProcessorTypeConverter.convert(processor)); - producer = endpoint.getConfig().getInnerProducerTemplate(); - producer.setMaximumCacheSize(endpoint.getConfig().getThreads()); - if (exceptionHandler == null) { - exceptionHandler = new LoggingExceptionHandler(getClass()); - } + this.producer = endpoint.getConfig().getInnerProducerTemplate(); } - - /* (non-Javadoc) - * @see org.apache.camel.impl.ServiceSupport#doStart() - */ @Override protected void doStart() throws Exception { ((RouteboxSedaEndpoint)getRouteboxEndpoint()).onStarted(this); @@ -65,30 +55,24 @@ public class RouteboxSedaConsumer extend // Create a URI link from the primary context to routes in the new inner context int poolSize = getRouteboxEndpoint().getConfig().getThreads(); - setExecutor(((RouteboxSedaEndpoint)getRouteboxEndpoint()).getCamelContext().getExecutorServiceStrategy() - .newFixedThreadPool(this, ((RouteboxSedaEndpoint)getRouteboxEndpoint()).getEndpointUri(), poolSize)); + setExecutor(getRouteboxEndpoint().getCamelContext().getExecutorServiceStrategy() + .newFixedThreadPool(this, getRouteboxEndpoint().getEndpointUri(), poolSize)); for (int i = 0; i < poolSize; i++) { - getExecutor().execute((Runnable) this); + getExecutor().execute(this); } } - /* (non-Javadoc) - * @see org.apache.camel.impl.ServiceSupport#doStop() - */ @Override protected void doStop() throws Exception { ((RouteboxSedaEndpoint)getRouteboxEndpoint()).onStopped(this); // Shutdown the executor - ((RouteboxSedaEndpoint)getRouteboxEndpoint()).getCamelContext().getExecutorServiceStrategy().shutdown(getExecutor()); + getRouteboxEndpoint().getCamelContext().getExecutorServiceStrategy().shutdown(getExecutor()); setExecutor(null); doStopInnerContext(); } - /* (non-Javadoc) - * @see java.lang.Runnable#run() - */ - public void run() { + public void run() { BlockingQueue<Exchange> queue = ((RouteboxSedaEndpoint)getRouteboxEndpoint()).getQueue(); while (queue != null && isRunAllowed()) { try { @@ -104,13 +88,13 @@ public class RouteboxSedaConsumer extend } private void dispatchToInnerRoute(BlockingQueue<Exchange> queue, final Exchange exchange) throws InterruptedException { - Exchange result = null; + Exchange result; if (exchange != null) { if (isRunAllowed()) { try { if (LOG.isDebugEnabled()) { - LOG.debug("**** Dispatching to Inner Route ****"); + LOG.debug("Dispatching to inner route: " + exchange); } RouteboxDispatcher dispatcher = new RouteboxDispatcher(producer); result = dispatcher.dispatchAsync(getRouteboxEndpoint(), exchange); @@ -131,33 +115,20 @@ public class RouteboxSedaConsumer extend } } - - /* (non-Javadoc) - * @see org.apache.camel.Consumer#getEndpoint() - */ public Endpoint getEndpoint() { - return (Endpoint) getRouteboxEndpoint(); + return getRouteboxEndpoint(); } - /* (non-Javadoc) - * @see org.apache.camel.spi.ShutdownAware#deferShutdown(org.apache.camel.ShutdownRunningTask) - */ public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) { return false; } - /* (non-Javadoc) - * @see org.apache.camel.spi.ShutdownAware#getPendingExchangesSize() - */ public int getPendingExchangesSize() { - return getPendingExchanges(); + // TODO: Get size of queue + return 0; } - /* (non-Javadoc) - * @see org.apache.camel.spi.ShutdownAware#prepareShutdown() - */ public void prepareShutdown() { - } public void setProcessor(AsyncProcessor processor) { @@ -168,20 +139,4 @@ public class RouteboxSedaConsumer extend return processor; } - public void setPendingExchanges(int pendingExchanges) { - this.pendingExchanges = pendingExchanges; - } - - public int getPendingExchanges() { - return pendingExchanges; - } - - public void setExceptionHandler(ExceptionHandler exceptionHandler) { - this.exceptionHandler = exceptionHandler; - } - - public ExceptionHandler getExceptionHandler() { - return exceptionHandler; - } - } Modified: camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaEndpoint.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaEndpoint.java?rev=1053342&r1=1053341&r2=1053342&view=diff ============================================================================== --- camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaEndpoint.java (original) +++ camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaEndpoint.java Tue Dec 28 13:10:59 2010 @@ -111,16 +111,10 @@ public class RouteboxSedaEndpoint extend return queue; } - /* (non-Javadoc) - * @see org.apache.camel.MultipleConsumersSupport#isMultipleConsumersSupported() - */ public boolean isMultipleConsumersSupported() { return true; } - /* (non-Javadoc) - * @see org.apache.camel.spi.BrowsableEndpoint#getExchanges() - */ public List<Exchange> getExchanges() { return new ArrayList<Exchange>(getQueue()); } Modified: camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/strategy/RouteboxDispatchStrategy.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/strategy/RouteboxDispatchStrategy.java?rev=1053342&r1=1053341&r2=1053342&view=diff ============================================================================== --- camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/strategy/RouteboxDispatchStrategy.java (original) +++ camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/strategy/RouteboxDispatchStrategy.java Tue Dec 28 13:10:59 2010 @@ -25,13 +25,15 @@ import org.apache.camel.Exchange; * A strategy for identifying the route consumer in the routebox where the exchange should to be dispatched */ public interface RouteboxDispatchStrategy { + /** * Receives an incoming exchange and consumer list and identifies the inner route consumer for dispatching the exchange * - * @param innerRouteConsumers the list of possible real-time inner route consumers available + * @param destinations the list of possible real-time inner route consumers available * to where the exchange can be dispatched in the routebox * @param exchange the incoming exchange * @return a selected consumer to whom the exchange can be directed + * @throws Exception is thrown if error */ URI selectDestinationUri(List<URI> destinations, Exchange exchange) throws Exception; } Modified: camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/strategy/RouteboxDispatcher.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/strategy/RouteboxDispatcher.java?rev=1053342&r1=1053341&r2=1053342&view=diff ============================================================================== --- camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/strategy/RouteboxDispatcher.java (original) +++ camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/strategy/RouteboxDispatcher.java Tue Dec 28 13:10:59 2010 @@ -25,7 +25,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import org.apache.camel.CamelContext; -import org.apache.camel.CamelException; +import org.apache.camel.CamelExchangeException; import org.apache.camel.CamelExecutionException; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; @@ -40,7 +40,6 @@ import org.apache.camel.model.RouteDefin import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; - public class RouteboxDispatcher { private static final transient Log LOG = LogFactory.getLog(RouteboxDispatcher.class); private ProducerTemplate producer; @@ -51,11 +50,11 @@ public class RouteboxDispatcher { } public Exchange dispatchSync(RouteboxEndpoint endpoint, Exchange exchange) throws Exception { - URI dispatchUri = null; - Exchange reply = null; + URI dispatchUri; + Exchange reply; if (LOG.isDebugEnabled()) { - LOG.debug("Dispatching exchange" + exchange + "to endpoint " + endpoint.getEndpointUri()); + LOG.debug("Dispatching exchange " + exchange + " to endpoint " + endpoint.getEndpointUri()); } dispatchUri = selectDispatchUri(endpoint, exchange); @@ -63,18 +62,18 @@ public class RouteboxDispatcher { if (exchange.getPattern() == ExchangePattern.InOnly) { reply = producer.send(dispatchUri.toASCIIString(), exchange); } else { - reply = (Exchange) issueRequest(endpoint, ExchangePattern.InOut, exchange.getIn().getBody(), exchange.getIn().getHeaders()); + reply = issueRequest(endpoint, ExchangePattern.InOut, exchange.getIn().getBody(), exchange.getIn().getHeaders()); } return reply; } public Exchange dispatchAsync(RouteboxEndpoint endpoint, Exchange exchange) throws Exception { - URI dispatchUri = null; - Exchange reply = null; + URI dispatchUri; + Exchange reply; if (LOG.isDebugEnabled()) { - LOG.debug("Dispatching exchange" + exchange + "to endpoint " + endpoint.getEndpointUri()); + LOG.debug("Dispatching exchange " + exchange + " to endpoint " + endpoint.getEndpointUri()); } dispatchUri = selectDispatchUri(endpoint, exchange); @@ -91,26 +90,27 @@ public class RouteboxDispatcher { } protected URI selectDispatchUri(RouteboxEndpoint endpoint, Exchange exchange) throws Exception { - URI dispatchUri = null; + URI dispatchUri; List<URI> consumerUris = getInnerContextConsumerList(endpoint.getConfig().getInnerContext()); if (consumerUris.isEmpty()) { - throw new CamelException("No routes found for dispatch in Routebox"); + throw new CamelExchangeException("No routes found to dispatch in Routebox at " + endpoint, exchange); } else if (consumerUris.size() == 1) { dispatchUri = consumerUris.get(0); } else { if (!endpoint.getConfig().getDispatchMap().isEmpty()) { - //apply URI string found in dispatch Map - if (endpoint.getConfig().getDispatchMap().containsKey(exchange.getIn().getHeader("ROUTE_DISPATCH_KEY"))) { - dispatchUri = new URI(endpoint.getConfig().getDispatchMap().get(exchange.getIn().getHeader("ROUTE_DISPATCH_KEY"))); + // apply URI string found in dispatch Map + String key = exchange.getIn().getHeader("ROUTE_DISPATCH_KEY", String.class); + if (endpoint.getConfig().getDispatchMap().containsKey(key)) { + dispatchUri = new URI(endpoint.getConfig().getDispatchMap().get(key)); } else { - throw new CamelException("No matching entry found in Dispatch Map for ROUTE_DISPATCH_KEY: " + exchange.getIn().getHeader("ROUTE_DISPATCH_KEY")); + throw new CamelExchangeException("No matching entry found in Dispatch Map for ROUTE_DISPATCH_KEY: " + key, exchange); } } else { - //apply dispatch strategy + // apply dispatch strategy dispatchUri = endpoint.getConfig().getDispatchStrategy().selectDestinationUri(consumerUris, exchange); if (dispatchUri == null) { - throw new CamelException("No matching inner routes found for Operation"); + throw new CamelExchangeException("No matching inner routes found for Operation", exchange); } } } @@ -138,9 +138,7 @@ public class RouteboxDispatcher { Exchange exchange = producer.send(endpoint, pattern, new Processor() { public void process(Exchange exchange) throws Exception { Message in = exchange.getIn(); - for (Map.Entry<String, Object> header : headers.entrySet()) { - in.setHeader(header.getKey(), header.getValue()); - } + in.getHeaders().putAll(headers); in.setBody(body); } });