Author: janstey Date: Tue Dec 1 19:19:25 2009 New Revision: 885876 URL: http://svn.apache.org/viewvc?rev=885876&view=rev Log: CAMEL-2245 - stop routing slip when exchange is failed
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/routingslip/RoutingSlipWithErrorHandlerTest.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java camel/trunk/camel-core/src/test/java/org/apache/camel/processor/routingslip/RoutingSlipWithExceptionTest.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java?rev=885876&r1=885875&r2=885876&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java Tue Dec 1 19:19:25 2009 @@ -28,6 +28,8 @@ import org.apache.camel.impl.ServiceSupport; import org.apache.camel.model.RoutingSlipDefinition; import org.apache.camel.util.ExchangeHelper; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import static org.apache.camel.util.ObjectHelper.notNull; @@ -37,6 +39,7 @@ * dependent on the value of a message header. */ public class RoutingSlip extends ServiceSupport implements Processor, Traceable { + private static final transient Log LOG = LogFactory.getLog(RoutingSlip.class); private ProducerCache producerCache; private final String header; private final String uriDelimiter; @@ -74,20 +77,54 @@ updateRoutingSlip(current); copyOutToIn(copy, current); - getProducerCache(exchange).doInProducer(endpoint, copy, null, new ProducerCallback<Object>() { - public Object doInProducer(Producer producer, Exchange exchange, ExchangePattern exchangePattern) throws Exception { - // set property which endpoint we send to - exchange.setProperty(Exchange.TO_ENDPOINT, producer.getEndpoint().getEndpointUri()); - producer.process(exchange); - return exchange; + try { + getProducerCache(exchange).doInProducer(endpoint, copy, null, new ProducerCallback<Object>() { + public Object doInProducer(Producer producer, Exchange exchange, ExchangePattern exchangePattern) throws Exception { + // set property which endpoint we send to + exchange.setProperty(Exchange.TO_ENDPOINT, producer.getEndpoint().getEndpointUri()); + producer.process(exchange); + return exchange; + } + }); + } catch (Exception e) { + // catch exception so we can decide if we want to continue or not + copy.setException(e); + } finally { + current = copy; + } + + // Decide whether to continue with the recipients or not; similar logic to the Pipeline + boolean exceptionHandled = hasExceptionBeenHandledByErrorHandler(current); + if (current.isFailed() || current.isRollbackOnly() || exceptionHandled) { + // The Exchange.ERRORHANDLED_HANDLED property is only set if satisfactory handling was done + // by the error handler. It's still an exception, the exchange still failed. + if (LOG.isDebugEnabled()) { + StringBuilder sb = new StringBuilder(); + sb.append("Message exchange has failed so breaking out of the routing slip: ").append(current); + if (current.isRollbackOnly()) { + sb.append(" Marked as rollback only."); + } + if (current.getException() != null) { + sb.append(" Exception: ").append(current.getException()); + } + if (current.hasOut() && current.getOut().isFault()) { + sb.append(" Fault: ").append(current.getOut()); + } + if (exceptionHandled) { + sb.append(" Handled by the error handler."); + } + LOG.debug(sb.toString()); } - }); - - current = copy; + break; + } } ExchangeHelper.copyResults(exchange, current); } + private static boolean hasExceptionBeenHandledByErrorHandler(Exchange nextExchange) { + return Boolean.TRUE.equals(nextExchange.getProperty(Exchange.ERRORHANDLER_HANDLED)); + } + protected ProducerCache getProducerCache(Exchange exchange) throws Exception { // setup producer cache as we need to use the pluggable service pool defined on camel context if (producerCache == null) { Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/routingslip/RoutingSlipWithErrorHandlerTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/routingslip/RoutingSlipWithErrorHandlerTest.java?rev=885876&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/routingslip/RoutingSlipWithErrorHandlerTest.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/routingslip/RoutingSlipWithErrorHandlerTest.java Tue Dec 1 19:19:25 2009 @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.routingslip; + +import org.apache.camel.builder.RouteBuilder; + +public class RoutingSlipWithErrorHandlerTest extends RoutingSlipWithExceptionTest { + + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + public void configure() { + onException(Exception.class).handled(true).to("mock:exception"); + + from("direct:start").routingSlip(ROUTING_SLIP_HEADER).to("mock:noexception"); + } + }; + } +} Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/routingslip/RoutingSlipWithExceptionTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/routingslip/RoutingSlipWithExceptionTest.java?rev=885876&r1=885875&r2=885876&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/routingslip/RoutingSlipWithExceptionTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/routingslip/RoutingSlipWithExceptionTest.java Tue Dec 1 19:19:25 2009 @@ -35,11 +35,13 @@ private MockEndpoint endEndpoint; private MockEndpoint exceptionEndpoint; private MockEndpoint exceptionSettingEndpoint; + private MockEndpoint aEndpoint; public void testNoException() throws Exception { endEndpoint.expectedMessageCount(1); exceptionEndpoint.expectedMessageCount(0); - + aEndpoint.expectedMessageCount(1); + sendRoutingSlipWithNoExceptionThrowingComponent(); assertEndpointsSatisfied(); @@ -48,6 +50,7 @@ public void testWithExceptionThrowingComponentFirstInList() throws Exception { endEndpoint.expectedMessageCount(0); exceptionEndpoint.expectedMessageCount(1); + aEndpoint.expectedMessageCount(0); sendRoutingSlipWithExceptionThrowingComponentFirstInList(); @@ -57,6 +60,7 @@ public void testWithExceptionThrowingComponentSecondInList() throws Exception { endEndpoint.expectedMessageCount(0); exceptionEndpoint.expectedMessageCount(1); + aEndpoint.expectedMessageCount(1); sendRoutingSlipWithExceptionThrowingComponentSecondInList(); @@ -66,6 +70,7 @@ public void testWithExceptionSettingComponentFirstInList() throws Exception { endEndpoint.expectedMessageCount(0); exceptionEndpoint.expectedMessageCount(1); + aEndpoint.expectedMessageCount(0); sendRoutingSlipWithExceptionSettingComponentFirstInList(); @@ -75,6 +80,7 @@ public void testWithExceptionSettingComponentSecondInList() throws Exception { endEndpoint.expectedMessageCount(0); exceptionEndpoint.expectedMessageCount(1); + aEndpoint.expectedMessageCount(1); sendRoutingSlipWithExceptionSettingComponentSecondInList(); @@ -82,18 +88,18 @@ } private void assertEndpointsSatisfied() throws InterruptedException { - MockEndpoint.assertIsSatisfied(5, TimeUnit.SECONDS, endEndpoint, exceptionEndpoint); + MockEndpoint.assertIsSatisfied(5, TimeUnit.SECONDS, endEndpoint, exceptionEndpoint, aEndpoint); } protected void sendRoutingSlipWithExceptionThrowingComponentFirstInList() { template.sendBodyAndHeader("direct:start", ANSWER, ROUTING_SLIP_HEADER, - "myBean?method=throwException,mock:x"); + "bean:myBean?method=throwException,mock:a"); } protected void sendRoutingSlipWithExceptionThrowingComponentSecondInList() { template.sendBodyAndHeader("direct:start", ANSWER, ROUTING_SLIP_HEADER, - "mock:a,myBean?method=throwException"); + "mock:a,bean:myBean?method=throwException"); } protected void sendRoutingSlipWithNoExceptionThrowingComponent() { @@ -118,7 +124,8 @@ endEndpoint = resolveMandatoryEndpoint("mock:noexception", MockEndpoint.class); exceptionEndpoint = resolveMandatoryEndpoint("mock:exception", MockEndpoint.class); exceptionSettingEndpoint = resolveMandatoryEndpoint("mock:exceptionSetting", MockEndpoint.class); - + aEndpoint = resolveMandatoryEndpoint("mock:a", MockEndpoint.class); + exceptionSettingEndpoint.whenAnyExchangeReceived(new Processor() { public void process(Exchange exchange) throws Exception { exchange.setException(new Exception("Throw me!"));