Repository: camel Updated Branches: refs/heads/master 52f88a23c -> 4ba43c396
CAMEL-10004: PollEnrich now supports bridge error handler of the consumer, so the route error handler can react on caught exception during polling. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/4ba43c39 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/4ba43c39 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/4ba43c39 Branch: refs/heads/master Commit: 4ba43c39613af9ad9ded6cbcfdb06c30a349ee93 Parents: 52f88a2 Author: Claus Ibsen <davscl...@apache.org> Authored: Wed Jun 1 11:06:19 2016 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Wed Jun 1 12:10:24 2016 +0200 ---------------------------------------------------------------------- .../camel/impl/EventDrivenPollingConsumer.java | 4 + .../apache/camel/processor/PollEnricher.java | 63 +++++++++++++ .../PollEnrichBridgeErrorHandlerTest.java | 97 ++++++++++++++++++++ 3 files changed, 164 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/4ba43c39/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java b/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java index 624e9b2..aebe538 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java +++ b/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java @@ -174,6 +174,10 @@ public class EventDrivenPollingConsumer extends PollingConsumerSupport implement this.interruptedExceptionHandler = interruptedExceptionHandler; } + public Consumer getDelegateConsumer() { + return consumer; + } + protected void handleInterruptedException(InterruptedException e) { getInterruptedExceptionHandler().handleException(e); } http://git-wip-us.apache.org/repos/asf/camel/blob/4ba43c39/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java b/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java index 691e55d..d83d964 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java +++ b/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java @@ -21,14 +21,19 @@ import org.apache.camel.AsyncProcessor; import org.apache.camel.CamelContext; import org.apache.camel.CamelContextAware; import org.apache.camel.CamelExchangeException; +import org.apache.camel.Consumer; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.Expression; import org.apache.camel.PollingConsumer; +import org.apache.camel.impl.BridgeExceptionHandlerToErrorHandler; import org.apache.camel.impl.ConsumerCache; +import org.apache.camel.impl.DefaultConsumer; import org.apache.camel.impl.EmptyConsumerCache; +import org.apache.camel.impl.EventDrivenPollingConsumer; import org.apache.camel.processor.aggregate.AggregationStrategy; import org.apache.camel.spi.EndpointUtilizationStatistics; +import org.apache.camel.spi.ExceptionHandler; import org.apache.camel.spi.IdAware; import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.AsyncProcessorHelper; @@ -208,6 +213,21 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor, IdAw return true; } + // grab the real delegate consumer that performs the actual polling + Consumer delegate = consumer; + if (consumer instanceof EventDrivenPollingConsumer) { + delegate = ((EventDrivenPollingConsumer) consumer).getDelegateConsumer(); + } + + // is the consumer bridging the error handler? + boolean bridgeErrorHandler = false; + if (delegate instanceof DefaultConsumer) { + ExceptionHandler handler = ((DefaultConsumer) delegate).getExceptionHandler(); + if (handler != null && handler instanceof BridgeExceptionHandlerToErrorHandler) { + bridgeErrorHandler = true; + } + } + Exchange resourceExchange; try { if (timeout < 0) { @@ -235,9 +255,21 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor, IdAw consumerCache.releasePollingConsumer(endpoint, consumer); } + // remember current redelivery stats + Object redeliveried = exchange.getIn().getHeader(Exchange.REDELIVERED); + Object redeliveryCounter = exchange.getIn().getHeader(Exchange.REDELIVERY_COUNTER); + Object redeliveryMaxCounter = exchange.getIn().getHeader(Exchange.REDELIVERY_MAX_COUNTER); + + // if we are bridging error handler and failed then remember the caused exception + Throwable cause = null; + if (resourceExchange != null && bridgeErrorHandler) { + cause = resourceExchange.getException(); + } + try { if (!isAggregateOnException() && (resourceExchange != null && resourceExchange.isFailed())) { // copy resource exchange onto original exchange (preserving pattern) + // and preserve redelivery headers copyResultsPreservePattern(exchange, resourceExchange); } else { prepareResult(exchange); @@ -256,6 +288,37 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor, IdAw } } + // if we failed then restore caused exception + if (cause != null) { + // restore caused exception + exchange.setException(cause); + // remove the exhausted marker as we want to be able to perform redeliveries with the error handler + exchange.removeProperties(Exchange.REDELIVERY_EXHAUSTED); + + // preserve the redelivery stats + if (redeliveried != null) { + if (exchange.hasOut()) { + exchange.getOut().setHeader(Exchange.REDELIVERED, redeliveried); + } else { + exchange.getIn().setHeader(Exchange.REDELIVERED, redeliveried); + } + } + if (redeliveryCounter != null) { + if (exchange.hasOut()) { + exchange.getOut().setHeader(Exchange.REDELIVERY_COUNTER, redeliveryCounter); + } else { + exchange.getIn().setHeader(Exchange.REDELIVERY_COUNTER, redeliveryCounter); + } + } + if (redeliveryMaxCounter != null) { + if (exchange.hasOut()) { + exchange.getOut().setHeader(Exchange.REDELIVERY_MAX_COUNTER, redeliveryMaxCounter); + } else { + exchange.getIn().setHeader(Exchange.REDELIVERY_MAX_COUNTER, redeliveryMaxCounter); + } + } + } + // set header with the uri of the endpoint enriched so we can use that for tracing etc if (exchange.hasOut()) { exchange.getOut().setHeader(Exchange.TO_ENDPOINT, consumer.getEndpoint().getEndpointUri()); http://git-wip-us.apache.org/repos/asf/camel/blob/4ba43c39/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichBridgeErrorHandlerTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichBridgeErrorHandlerTest.java b/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichBridgeErrorHandlerTest.java new file mode 100644 index 0000000..6e47cfc --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichBridgeErrorHandlerTest.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.enricher; + +import org.apache.camel.Consumer; +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Endpoint; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.impl.JndiRegistry; +import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy; +import org.apache.camel.spi.PollingConsumerPollStrategy; +import org.junit.Test; + +public class PollEnrichBridgeErrorHandlerTest extends ContextTestSupport { + + private MyPollingStrategy myPoll = new MyPollingStrategy(); + + @Override + protected JndiRegistry createRegistry() throws Exception { + JndiRegistry jndi = super.createRegistry(); + jndi.bind("myPoll", myPoll); + return jndi; + } + + @Test + public void testPollEnrichBridgeErrorHandler() throws Exception { + getMockEndpoint("mock:dead").expectedMessageCount(1); + getMockEndpoint("mock:result").expectedMessageCount(0); + + template.sendBody("seda:start", "Hello World"); + + assertMockEndpointsSatisfied(); + + assertEquals(1 + 3, myPoll.getCounter()); + + Exception caught = getMockEndpoint("mock:dead").getExchanges().get(0).getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class); + assertNotNull(caught); + assertEquals("Something went wrong", caught.getMessage()); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + // try at most 3 times and if still failing move to DLQ + errorHandler(deadLetterChannel("mock:dead").maximumRedeliveries(3).redeliveryDelay(0)); + + from("seda:start") + // bridge the error handler when doing a polling so we can let Camel's error handler decide what to do + .pollEnrich("file:target/foo?pollStrategy=#myPoll&consumer.bridgeErrorHandler=true", 10000, new UseLatestAggregationStrategy()) + .to("mock:result"); + } + }; + } + + private class MyPollingStrategy implements PollingConsumerPollStrategy { + + private int counter; + + @Override + public boolean begin(Consumer consumer, Endpoint endpoint) { + counter++; + throw new IllegalArgumentException("Something went wrong"); + } + + @Override + public void commit(Consumer consumer, Endpoint endpoint, int polledMessages) { + // noop + } + + @Override + public boolean rollback(Consumer consumer, Endpoint endpoint, int retryCounter, Exception cause) throws Exception { + return false; + } + + public int getCounter() { + return counter; + } + } + +}