Repository: camel Updated Branches: refs/heads/master be3bae225 -> eefc37f90
CAMEL-8977: Enrich and PollEnrich - Add option ignoreInvalidEndpoint Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/97693494 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/97693494 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/97693494 Branch: refs/heads/master Commit: 9769349461a315e7e5b1f0c4ad6da9e1008a17be Parents: be3bae2 Author: Claus Ibsen <davscl...@apache.org> Authored: Fri Jul 17 08:57:43 2015 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Fri Jul 17 08:57:43 2015 +0200 ---------------------------------------------------------------------- .../apache/camel/model/EnrichDefinition.java | 24 +++++++++- .../camel/model/PollEnrichDefinition.java | 22 +++++++++ .../org/apache/camel/processor/Enricher.java | 20 +++++++- .../apache/camel/processor/PollEnricher.java | 20 +++++++- ...richExpressionIgnoreInvalidEndpointTest.java | 48 ++++++++++++++++++++ ...richExpressionIgnoreInvalidEndpointTest.java | 46 +++++++++++++++++++ 6 files changed, 174 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/97693494/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java b/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java index 2aab5d9..1d05f60 100644 --- a/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java @@ -55,6 +55,8 @@ public class EnrichDefinition extends NoOutputExpressionNode { private Boolean shareUnitOfWork; @XmlAttribute private Integer cacheSize; + @XmlAttribute + private Boolean ignoreInvalidEndpoint; public EnrichDefinition() { this(null); @@ -77,13 +79,13 @@ public class EnrichDefinition extends NoOutputExpressionNode { @Override public Processor createProcessor(RouteContext routeContext) throws Exception { - // lookup endpoint - Expression exp = getExpression().createExpression(routeContext); boolean isShareUnitOfWork = getShareUnitOfWork() != null && getShareUnitOfWork(); + boolean isIgnoreInvalidEndpoint = getIgnoreInvalidEndpoint() != null && getIgnoreInvalidEndpoint(); Enricher enricher = new Enricher(exp); enricher.setShareUnitOfWork(isShareUnitOfWork); + enricher.setIgnoreInvalidEndpoint(isIgnoreInvalidEndpoint); AggregationStrategy strategy = createAggregationStrategy(routeContext); if (strategy != null) { enricher.setAggregationStrategy(strategy); @@ -191,6 +193,16 @@ public class EnrichDefinition extends NoOutputExpressionNode { return this; } + /** + * Ignore the invalidate endpoint exception when try to create a producer with that endpoint + * + * @return the builder + */ + public EnrichDefinition ignoreInvalidEndpoint() { + setIgnoreInvalidEndpoint(true); + return this; + } + // Properties // ------------------------------------------------------------------------- @@ -258,4 +270,12 @@ public class EnrichDefinition extends NoOutputExpressionNode { public void setCacheSize(Integer cacheSize) { this.cacheSize = cacheSize; } + + public Boolean getIgnoreInvalidEndpoint() { + return ignoreInvalidEndpoint; + } + + public void setIgnoreInvalidEndpoint(Boolean ignoreInvalidEndpoint) { + this.ignoreInvalidEndpoint = ignoreInvalidEndpoint; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/97693494/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java b/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java index e2e2b57..f200051 100644 --- a/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java @@ -55,6 +55,8 @@ public class PollEnrichDefinition extends NoOutputExpressionNode { private AggregationStrategy aggregationStrategy; @XmlAttribute private Integer cacheSize; + @XmlAttribute + private Boolean ignoreInvalidEndpoint; public PollEnrichDefinition() { } @@ -79,6 +81,7 @@ public class PollEnrichDefinition extends NoOutputExpressionNode { // if no timeout then we should block, and there use a negative timeout long time = timeout != null ? timeout : -1; + boolean isIgnoreInvalidEndpoint = getIgnoreInvalidEndpoint() != null && getIgnoreInvalidEndpoint(); Expression exp = getExpression().createExpression(routeContext); PollEnricher enricher = new PollEnricher(exp, time); @@ -95,6 +98,7 @@ public class PollEnrichDefinition extends NoOutputExpressionNode { if (getCacheSize() != null) { enricher.setCacheSize(getCacheSize()); } + enricher.setIgnoreInvalidEndpoint(isIgnoreInvalidEndpoint); return enricher; } @@ -202,6 +206,16 @@ public class PollEnrichDefinition extends NoOutputExpressionNode { return this; } + /** + * Ignore the invalidate endpoint exception when try to create a producer with that endpoint + * + * @return the builder + */ + public PollEnrichDefinition ignoreInvalidEndpoint() { + setIgnoreInvalidEndpoint(true); + return this; + } + // Properties // ------------------------------------------------------------------------- @@ -269,4 +283,12 @@ public class PollEnrichDefinition extends NoOutputExpressionNode { public void setCacheSize(Integer cacheSize) { this.cacheSize = cacheSize; } + + public Boolean getIgnoreInvalidEndpoint() { + return ignoreInvalidEndpoint; + } + + public void setIgnoreInvalidEndpoint(Boolean ignoreInvalidEndpoint) { + this.ignoreInvalidEndpoint = ignoreInvalidEndpoint; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/97693494/camel-core/src/main/java/org/apache/camel/processor/Enricher.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/Enricher.java b/camel-core/src/main/java/org/apache/camel/processor/Enricher.java index cbdf104..06e0b8f 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/Enricher.java +++ b/camel-core/src/main/java/org/apache/camel/processor/Enricher.java @@ -66,6 +66,7 @@ public class Enricher extends ServiceSupport implements AsyncProcessor, IdAware, private boolean aggregateOnException; private boolean shareUnitOfWork; private int cacheSize; + private boolean ignoreInvalidEndpoint; public Enricher(Expression expression) { this.expression = expression; @@ -119,6 +120,14 @@ public class Enricher extends ServiceSupport implements AsyncProcessor, IdAware, this.cacheSize = cacheSize; } + public boolean isIgnoreInvalidEndpoint() { + return ignoreInvalidEndpoint; + } + + public void setIgnoreInvalidEndpoint(boolean ignoreInvalidEndpoint) { + this.ignoreInvalidEndpoint = ignoreInvalidEndpoint; + } + public void process(Exchange exchange) throws Exception { AsyncProcessorHelper.process(this, exchange); } @@ -141,13 +150,20 @@ public class Enricher extends ServiceSupport implements AsyncProcessor, IdAware, final Endpoint endpoint; // use dynamic endpoint so calculate the endpoint to use + Object recipient = null; try { - Object recipient = expression.evaluate(exchange, Object.class); + recipient = expression.evaluate(exchange, Object.class); endpoint = resolveEndpoint(exchange, recipient); // acquire the consumer from the cache producer = producerCache.acquireProducer(endpoint); } catch (Throwable e) { - exchange.setException(e); + if (isIgnoreInvalidEndpoint()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Endpoint uri is invalid: " + recipient + ". This exception will be ignored.", e); + } + } else { + exchange.setException(e); + } callback.done(true); return true; } http://git-wip-us.apache.org/repos/asf/camel/blob/97693494/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java b/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java index de3c7b4..bb52bff 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java +++ b/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java @@ -61,6 +61,7 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor, IdAw private long timeout; private boolean aggregateOnException; private int cacheSize; + private boolean ignoreInvalidEndpoint; /** * Creates a new {@link PollEnricher}. @@ -141,6 +142,14 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor, IdAw this.cacheSize = cacheSize; } + public boolean isIgnoreInvalidEndpoint() { + return ignoreInvalidEndpoint; + } + + public void setIgnoreInvalidEndpoint(boolean ignoreInvalidEndpoint) { + this.ignoreInvalidEndpoint = ignoreInvalidEndpoint; + } + public void process(Exchange exchange) throws Exception { AsyncProcessorHelper.process(this, exchange); } @@ -172,13 +181,20 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor, IdAw Endpoint endpoint; // use dynamic endpoint so calculate the endpoint to use + Object recipient = null; try { - Object recipient = expression.evaluate(exchange, Object.class); + recipient = expression.evaluate(exchange, Object.class); endpoint = resolveEndpoint(exchange, recipient); // acquire the consumer from the cache consumer = consumerCache.acquirePollingConsumer(endpoint); } catch (Throwable e) { - exchange.setException(e); + if (isIgnoreInvalidEndpoint()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Endpoint uri is invalid: " + recipient + ". This exception will be ignored.", e); + } + } else { + exchange.setException(e); + } callback.done(true); return true; } http://git-wip-us.apache.org/repos/asf/camel/blob/97693494/camel-core/src/test/java/org/apache/camel/processor/enricher/EnrichExpressionIgnoreInvalidEndpointTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/enricher/EnrichExpressionIgnoreInvalidEndpointTest.java b/camel-core/src/test/java/org/apache/camel/processor/enricher/EnrichExpressionIgnoreInvalidEndpointTest.java new file mode 100644 index 0000000..e4a145b --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/enricher/EnrichExpressionIgnoreInvalidEndpointTest.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.enricher; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; + +public class EnrichExpressionIgnoreInvalidEndpointTest extends ContextTestSupport { + + public void testEnrichExpression() throws Exception { + getMockEndpoint("mock:result").expectedBodiesReceived("Hello World", "Unknown"); + + template.sendBodyAndHeader("direct:start", "Camel", "source", "direct:foo"); + template.sendBodyAndHeader("direct:start", "Unknown", "source", "unknown"); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start") + .enrich().header("source").ignoreInvalidEndpoint() + .to("mock:result"); + + from("direct:foo").transform().constant("Hello World"); + + from("direct:bar").transform().constant("Bye World"); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/97693494/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichExpressionIgnoreInvalidEndpointTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichExpressionIgnoreInvalidEndpointTest.java b/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichExpressionIgnoreInvalidEndpointTest.java new file mode 100644 index 0000000..74a29d3 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichExpressionIgnoreInvalidEndpointTest.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.enricher; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; + +public class PollEnrichExpressionIgnoreInvalidEndpointTest extends ContextTestSupport { + + public void testPollEnrichExpression() throws Exception { + getMockEndpoint("mock:result").expectedBodiesReceived("Hello World", "Unknown"); + + template.sendBody("seda:foo", "Hello World"); + + template.sendBodyAndHeader("direct:start", null, "source", "seda:foo"); + template.sendBodyAndHeader("direct:start", "Unknown", "source", "unknown"); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start") + .pollEnrich().header("source").ignoreInvalidEndpoint() + .to("mock:result"); + } + }; + } +}