Author: ningjiang Date: Wed May 12 11:54:53 2010 New Revision: 943458 URL: http://svn.apache.org/viewvc?rev=943458&view=rev Log: CAMEL-2710 Skip invalid endpoints without throwing an exception
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListIgnoreInvalidEndpointsTest.java (with props) Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java?rev=943458&r1=943457&r2=943458&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java Wed May 12 11:54:53 2010 @@ -57,6 +57,8 @@ public class RecipientListDefinition<Typ private String executorServiceRef; @XmlAttribute(required = false) private Boolean stopOnException; + @XmlAttribute(required = false) + private Boolean ignoreInvalidEndpoints; public RecipientListDefinition() { } @@ -94,6 +96,9 @@ public class RecipientListDefinition<Typ if (stopOnException != null) { answer.setStopOnException(isStopOnException()); } + if (ignoreInvalidEndpoints != null) { + answer.setIgnoreInvalidEndpoints(ignoreInvalidEndpoints); + } executorService = ExecutorServiceHelper.getConfiguredExecutorService(routeContext, "RecipientList", this); if (isParallelProcessing() && executorService == null) { @@ -159,6 +164,16 @@ public class RecipientListDefinition<Typ setStrategyRef(aggregationStrategyRef); return this; } + + /** + * Ignore the invalidate endpoint exception when try to create a producer with that endpoint + * + * @return the builder + */ + public RecipientListDefinition<Type> ignoreInvalidEndpoints() { + setIgnoreInvalidEndpoints(true); + return this; + } /** * Doing the recipient list work in parallel @@ -227,6 +242,14 @@ public class RecipientListDefinition<Typ public void setExecutorServiceRef(String executorServiceRef) { this.executorServiceRef = executorServiceRef; } + + public Boolean isIgnoreInvalidEndpoints() { + return ignoreInvalidEndpoints; + } + + public void setIgnoreInvalidEndpoints(Boolean ignoreInvalidEndpoints) { + this.ignoreInvalidEndpoints = ignoreInvalidEndpoints; + } public Boolean isStopOnException() { return stopOnException; Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java?rev=943458&r1=943457&r2=943458&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java Wed May 12 11:54:53 2010 @@ -36,6 +36,8 @@ import org.apache.camel.processor.aggreg import org.apache.camel.util.ExchangeHelper; import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.ServiceHelper; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import static org.apache.camel.util.ObjectHelper.notNull; @@ -48,12 +50,14 @@ import static org.apache.camel.util.Obje * @version $Revision$ */ public class RecipientList extends ServiceSupport implements Processor { + private static final transient Log LOG = LogFactory.getLog(RecipientList.class); private final CamelContext camelContext; private ProducerCache producerCache; private Expression expression; private final String delimiter; private boolean parallelProcessing; private boolean stopOnException; + private boolean ignoreInvalidEndpoints; private ExecutorService executorService; private AggregationStrategy aggregationStrategy = new UseLatestAggregationStrategy(); @@ -110,11 +114,19 @@ public class RecipientList extends Servi List<Processor> processors = new ArrayList<Processor>(); while (iter.hasNext()) { Object recipient = iter.next(); - Endpoint endpoint = resolveEndpoint(exchange, recipient); - // acquire producer which we then release later - Producer producer = producerCache.acquireProducer(endpoint); - processors.add(producer); - producers.put(endpoint, producer); + try { + Endpoint endpoint = resolveEndpoint(exchange, recipient); + // acquire producer which we then release later + Producer producer = producerCache.acquireProducer(endpoint); + processors.add(producer); + producers.put(endpoint, producer); + } catch (Exception ex) { + if (isIgnoreInvalidEndpoints()) { + LOG.warn("Get a invalid endpoint with " + recipient , ex); + } else { + throw ex; + } + } } MulticastProcessor mp = new MulticastProcessor(exchange.getContext(), processors, getAggregationStrategy(), @@ -150,6 +162,14 @@ public class RecipientList extends Servi protected void doStop() throws Exception { ServiceHelper.stopService(producerCache); } + + public boolean isIgnoreInvalidEndpoints() { + return ignoreInvalidEndpoints; + } + + public void setIgnoreInvalidEndpoints(boolean ignoreInvalidEndpoints) { + this.ignoreInvalidEndpoints = ignoreInvalidEndpoints; + } public boolean isParallelProcessing() { return parallelProcessing; Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListIgnoreInvalidEndpointsTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListIgnoreInvalidEndpointsTest.java?rev=943458&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListIgnoreInvalidEndpointsTest.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListIgnoreInvalidEndpointsTest.java Wed May 12 11:54:53 2010 @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.ResolveEndpointFailedException; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; + +import static org.apache.camel.language.simple.SimpleLanguage.simple; + +public class RecipientListIgnoreInvalidEndpointsTest extends ContextTestSupport { + + public void testRecipientListWithIgnoreInvalidEndpointsOption() throws Exception { + MockEndpoint result = getMockEndpoint("mock:result"); + result.expectedBodiesReceived("Hello World"); + + MockEndpoint endpointA = getMockEndpoint("mock:endpointA"); + endpointA.expectedBodiesReceived("Hello a"); + + template.requestBody("direct:startA", "Hello World", String.class); + + assertMockEndpointsSatisfied(); + } + + public void testRecipientListWithoutIgnoreInvalidEndpointsOption() throws Exception { + MockEndpoint result = getMockEndpoint("mock:result"); + result.expectedMessageCount(0); + + MockEndpoint endpointA = getMockEndpoint("mock:endpointA"); + endpointA.expectedMessageCount(0); + + try { + template.requestBody("direct:startB", "Hello World", String.class); + fail("Expect the exception here."); + } catch (Exception ex) { + assertTrue("Get a wrong cause of the exception", ex.getCause() instanceof ResolveEndpointFailedException); + } + + assertMockEndpointsSatisfied(); + } + + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + public void configure() { + from("direct:startA").recipientList(simple("mock:result,fail:endpoint,direct:a")).ignoreInvalidEndpoints(); + + from("direct:startB").recipientList(simple("mock:result,fail:endpoint,direct:a")); + + from("direct:a").transform(constant("Hello a")).to("mock:endpointA"); + + } + }; + } + + +} Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListIgnoreInvalidEndpointsTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListIgnoreInvalidEndpointsTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date