Repository: camel Updated Branches: refs/heads/master 34db1920f -> 3626a0391
CAMEL-8740: Add event ExchangeFailureHandlingEvent which allows custom components to track when an Exchange is beint sent to DLQ and allow the component to alter the Exchange etc. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3626a039 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3626a039 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3626a039 Branch: refs/heads/master Commit: 3626a0391a54555d0cb1e476eea9ad3f312d18ff Parents: 34db192 Author: Claus Ibsen <davscl...@apache.org> Authored: Tue May 5 20:09:14 2015 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Wed May 6 07:57:09 2015 +0200 ---------------------------------------------------------------------- .../management/event/DefaultEventFactory.java | 9 +++ .../event/ExchangeFailureHandlingEvent.java | 61 ++++++++++++++++++++ .../apache/camel/processor/CatchProcessor.java | 5 ++ .../camel/processor/RedeliveryErrorHandler.java | 6 +- .../java/org/apache/camel/spi/EventFactory.java | 19 ++++++ .../java/org/apache/camel/util/EventHelper.java | 34 +++++++++++ .../EventNotifierFailureHandledEventsTest.java | 56 +++++++++++------- .../EventNotifierRedeliveryEventsTest.java | 16 ++--- 8 files changed, 177 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/3626a039/camel-core/src/main/java/org/apache/camel/management/event/DefaultEventFactory.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/event/DefaultEventFactory.java b/camel-core/src/main/java/org/apache/camel/management/event/DefaultEventFactory.java index 0f78475..4bf7f7f 100644 --- a/camel-core/src/main/java/org/apache/camel/management/event/DefaultEventFactory.java +++ b/camel-core/src/main/java/org/apache/camel/management/event/DefaultEventFactory.java @@ -93,6 +93,15 @@ public class DefaultEventFactory implements EventFactory { return new ExchangeFailedEvent(exchange); } + public EventObject createExchangeFailureHandlingEvent(Exchange exchange, Processor failureHandler, boolean deadLetterChannel, String deadLetterUri) { + // unwrap delegate processor + Processor handler = failureHandler; + if (handler instanceof DelegateProcessor) { + handler = ((DelegateProcessor) handler).getProcessor(); + } + return new ExchangeFailureHandlingEvent(exchange, handler, deadLetterChannel, deadLetterUri); + } + public EventObject createExchangeFailureHandledEvent(Exchange exchange, Processor failureHandler, boolean deadLetterChannel, String deadLetterUri) { // unwrap delegate processor http://git-wip-us.apache.org/repos/asf/camel/blob/3626a039/camel-core/src/main/java/org/apache/camel/management/event/ExchangeFailureHandlingEvent.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/event/ExchangeFailureHandlingEvent.java b/camel-core/src/main/java/org/apache/camel/management/event/ExchangeFailureHandlingEvent.java new file mode 100644 index 0000000..614b308 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/management/event/ExchangeFailureHandlingEvent.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.management.event; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.util.URISupport; + +/** + * @version + */ +public class ExchangeFailureHandlingEvent extends AbstractExchangeEvent { + private static final long serialVersionUID = -7554809462006009549L; + + private final transient Processor failureHandler; + private final boolean deadLetterChannel; + private final String deadLetterUri; + + public ExchangeFailureHandlingEvent(Exchange source, Processor failureHandler, boolean deadLetterChannel, String deadLetterUri) { + super(source); + this.failureHandler = failureHandler; + this.deadLetterChannel = deadLetterChannel; + this.deadLetterUri = deadLetterUri; + } + + public Processor getFailureHandler() { + return failureHandler; + } + + public boolean isDeadLetterChannel() { + return deadLetterChannel; + } + + public String getDeadLetterUri() { + return deadLetterUri; + } + + @Override + public String toString() { + if (isDeadLetterChannel()) { + String uri = URISupport.sanitizeUri(deadLetterUri); + return getExchange().getExchangeId() + " exchange failed: " + getExchange() + " but is being handled by dead letter channel: " + uri; + } else { + return getExchange().getExchangeId() + " exchange failed: " + getExchange() + " but is being processed by failure processor: " + failureHandler; + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/3626a039/camel-core/src/main/java/org/apache/camel/processor/CatchProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/CatchProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/CatchProcessor.java index 654cfa4..c2ea27f 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/CatchProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/CatchProcessor.java @@ -99,6 +99,11 @@ public class CatchProcessor extends DelegateAsyncProcessor implements Traceable, new Object[]{handled, e.getClass().getName(), e.getMessage()}); } + if (handled) { + // emit event that the failure is being handled + EventHelper.notifyExchangeFailureHandling(exchange.getContext(), exchange, processor, false, null); + } + boolean sync = processor.process(exchange, new AsyncCallback() { public void done(boolean doneSync) { if (handled) { http://git-wip-us.apache.org/repos/asf/camel/blob/3626a039/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java b/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java index e1de1d2..062ad6a 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java +++ b/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java @@ -907,6 +907,11 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme exchange.setProperty(Exchange.FAILURE_ROUTE_ID, uow.getRouteContext().getRoute().getId()); } + // fire event as we had a failure processor to handle it, which there is a event for + final boolean deadLetterChannel = processor == data.deadLetterProcessor; + + EventHelper.notifyExchangeFailureHandling(exchange.getContext(), exchange, processor, deadLetterChannel, deadLetterUri); + // the failure processor could also be asynchronous AsyncProcessor afp = AsyncProcessorConverterHelper.convert(processor); sync = afp.process(exchange, new AsyncCallback() { @@ -915,7 +920,6 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme try { prepareExchangeAfterFailure(exchange, data, isDeadLetterChannel, shouldHandle, shouldContinue); // fire event as we had a failure processor to handle it, which there is a event for - boolean deadLetterChannel = processor == data.deadLetterProcessor; EventHelper.notifyExchangeFailureHandled(exchange.getContext(), exchange, processor, deadLetterChannel, deadLetterUri); } finally { // if the fault was handled asynchronously, this should be reflected in the callback as well http://git-wip-us.apache.org/repos/asf/camel/blob/3626a039/camel-core/src/main/java/org/apache/camel/spi/EventFactory.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/spi/EventFactory.java b/camel-core/src/main/java/org/apache/camel/spi/EventFactory.java index e42d178..9253d83 100644 --- a/camel-core/src/main/java/org/apache/camel/spi/EventFactory.java +++ b/camel-core/src/main/java/org/apache/camel/spi/EventFactory.java @@ -161,7 +161,26 @@ public interface EventFactory { /** * Creates an {@link EventObject} when an {@link org.apache.camel.Exchange} has failed + * but is being handled by the Camel error handlers such as an dead letter channel, or a doTry .. doCatch block. + * <p/> + * This event is triggered <b>before</b> sending the the failure handler, where as + * <tt>createExchangeFailureHandledEvent</tt> if the event <b>after</b>. + * + * @param exchange the exchange + * @param failureHandler the failure handler such as moving the message to a dead letter queue + * @param deadLetterChannel whether it was a dead letter channel or not handling the failure + * @param deadLetterUri the dead letter uri, if its a dead letter channel + * @return the created event + */ + EventObject createExchangeFailureHandlingEvent(Exchange exchange, Processor failureHandler, + boolean deadLetterChannel, String deadLetterUri); + + /** + * Creates an {@link EventObject} when an {@link org.apache.camel.Exchange} has failed * but was handled by the Camel error handlers such as an dead letter channel, or a doTry .. doCatch block. + * <p/> + * This event is triggered <b>after</b> the exchange was sent to failure handler, where as + * <tt>createExchangeFailureHandlingEvent</tt> if the event <b>before</b>. * * @param exchange the exchange * @param failureHandler the failure handler such as moving the message to a dead letter queue http://git-wip-us.apache.org/repos/asf/camel/blob/3626a039/camel-core/src/main/java/org/apache/camel/util/EventHelper.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/util/EventHelper.java b/camel-core/src/main/java/org/apache/camel/util/EventHelper.java index cabe364..98455f3 100644 --- a/camel-core/src/main/java/org/apache/camel/util/EventHelper.java +++ b/camel-core/src/main/java/org/apache/camel/util/EventHelper.java @@ -478,6 +478,40 @@ public final class EventHelper { } } + public static void notifyExchangeFailureHandling(CamelContext context, Exchange exchange, Processor failureHandler, + boolean deadLetterChannel, String deadLetterUri) { + if (exchange.getProperty(Exchange.NOTIFY_EVENT, false, Boolean.class)) { + // do not generate events for an notify event + return; + } + + ManagementStrategy management = context.getManagementStrategy(); + if (management == null) { + return; + } + + List<EventNotifier> notifiers = management.getEventNotifiers(); + if (notifiers == null || notifiers.isEmpty()) { + return; + } + + for (EventNotifier notifier : notifiers) { + if (notifier.isIgnoreExchangeEvents() || notifier.isIgnoreExchangeFailedEvents()) { + continue; + } + + EventFactory factory = management.getEventFactory(); + if (factory == null) { + return; + } + EventObject event = factory.createExchangeFailureHandlingEvent(exchange, failureHandler, deadLetterChannel, deadLetterUri); + if (event == null) { + return; + } + doNotifyEvent(notifier, event); + } + } + public static void notifyExchangeFailureHandled(CamelContext context, Exchange exchange, Processor failureHandler, boolean deadLetterChannel, String deadLetterUri) { if (exchange.getProperty(Exchange.NOTIFY_EVENT, false, Boolean.class)) { http://git-wip-us.apache.org/repos/asf/camel/blob/3626a039/camel-core/src/test/java/org/apache/camel/management/EventNotifierFailureHandledEventsTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/management/EventNotifierFailureHandledEventsTest.java b/camel-core/src/test/java/org/apache/camel/management/EventNotifierFailureHandledEventsTest.java index 0b2e313..ab2a373 100644 --- a/camel-core/src/test/java/org/apache/camel/management/EventNotifierFailureHandledEventsTest.java +++ b/camel-core/src/test/java/org/apache/camel/management/EventNotifierFailureHandledEventsTest.java @@ -29,6 +29,7 @@ import org.apache.camel.management.event.CamelContextStartingEvent; import org.apache.camel.management.event.ExchangeCompletedEvent; import org.apache.camel.management.event.ExchangeCreatedEvent; import org.apache.camel.management.event.ExchangeFailureHandledEvent; +import org.apache.camel.management.event.ExchangeFailureHandlingEvent; import org.apache.camel.management.event.ExchangeSendingEvent; import org.apache.camel.management.event.ExchangeSentEvent; import org.apache.camel.management.event.RouteAddedEvent; @@ -92,17 +93,22 @@ public class EventNotifierFailureHandledEventsTest extends ContextTestSupport { template.sendBody("direct:start", "Hello World"); assertMockEndpointsSatisfied(); - assertEquals(11, events.size()); + assertEquals(12, events.size()); assertIsInstanceOf(CamelContextStartingEvent.class, events.get(0)); assertIsInstanceOf(RouteAddedEvent.class, events.get(1)); assertIsInstanceOf(RouteStartedEvent.class, events.get(2)); assertIsInstanceOf(CamelContextStartedEvent.class, events.get(3)); assertIsInstanceOf(ExchangeSendingEvent.class, events.get(4)); assertIsInstanceOf(ExchangeCreatedEvent.class, events.get(5)); - assertIsInstanceOf(ExchangeSendingEvent.class, events.get(6)); - assertIsInstanceOf(ExchangeSentEvent.class, events.get(7)); - ExchangeFailureHandledEvent e = assertIsInstanceOf(ExchangeFailureHandledEvent.class, events.get(8)); + ExchangeFailureHandlingEvent e0 = assertIsInstanceOf(ExchangeFailureHandlingEvent.class, events.get(6)); + assertEquals("should be DLC", true, e0.isDeadLetterChannel()); + assertEquals("mock://dead", e0.getDeadLetterUri()); + + assertIsInstanceOf(ExchangeSendingEvent.class, events.get(7)); + assertIsInstanceOf(ExchangeSentEvent.class, events.get(8)); + + ExchangeFailureHandledEvent e = assertIsInstanceOf(ExchangeFailureHandledEvent.class, events.get(9)); assertEquals("should be DLC", true, e.isDeadLetterChannel()); assertTrue("should be marked as failure handled", e.isHandled()); assertFalse("should not be continued", e.isContinued()); @@ -111,10 +117,10 @@ public class EventNotifierFailureHandledEventsTest extends ContextTestSupport { assertEquals("mock://dead", e.getDeadLetterUri()); // dead letter channel will mark the exchange as completed - assertIsInstanceOf(ExchangeCompletedEvent.class, events.get(9)); + assertIsInstanceOf(ExchangeCompletedEvent.class, events.get(10)); // and the last event should be the direct:start - assertIsInstanceOf(ExchangeSentEvent.class, events.get(10)); - ExchangeSentEvent sent = (ExchangeSentEvent) events.get(10); + assertIsInstanceOf(ExchangeSentEvent.class, events.get(11)); + ExchangeSentEvent sent = (ExchangeSentEvent) events.get(11); assertEquals("direct://start", sent.getEndpoint().getEndpointUri()); } @@ -133,26 +139,30 @@ public class EventNotifierFailureHandledEventsTest extends ContextTestSupport { template.sendBody("direct:start", "Hello World"); assertMockEndpointsSatisfied(); - assertEquals(11, events.size()); + assertEquals(12, events.size()); assertIsInstanceOf(CamelContextStartingEvent.class, events.get(0)); assertIsInstanceOf(RouteAddedEvent.class, events.get(1)); assertIsInstanceOf(RouteStartedEvent.class, events.get(2)); assertIsInstanceOf(CamelContextStartedEvent.class, events.get(3)); assertIsInstanceOf(ExchangeSendingEvent.class, events.get(4)); assertIsInstanceOf(ExchangeCreatedEvent.class, events.get(5)); - assertIsInstanceOf(ExchangeSendingEvent.class, events.get(6)); - assertIsInstanceOf(ExchangeSentEvent.class, events.get(7)); - ExchangeFailureHandledEvent e = assertIsInstanceOf(ExchangeFailureHandledEvent.class, events.get(8)); + ExchangeFailureHandlingEvent e0 = assertIsInstanceOf(ExchangeFailureHandlingEvent.class, events.get(6)); + assertEquals("should NOT be DLC", false, e0.isDeadLetterChannel()); + + assertIsInstanceOf(ExchangeSendingEvent.class, events.get(7)); + assertIsInstanceOf(ExchangeSentEvent.class, events.get(8)); + + ExchangeFailureHandledEvent e = assertIsInstanceOf(ExchangeFailureHandledEvent.class, events.get(9)); assertEquals("should NOT be DLC", false, e.isDeadLetterChannel()); assertTrue("should be marked as failure handled", e.isHandled()); assertFalse("should not be continued", e.isContinued()); // onException will handle the exception - assertIsInstanceOf(ExchangeCompletedEvent.class, events.get(9)); + assertIsInstanceOf(ExchangeCompletedEvent.class, events.get(10)); // and the last event should be the direct:start - assertIsInstanceOf(ExchangeSentEvent.class, events.get(10)); - ExchangeSentEvent sent = (ExchangeSentEvent) events.get(10); + assertIsInstanceOf(ExchangeSentEvent.class, events.get(11)); + ExchangeSentEvent sent = (ExchangeSentEvent) events.get(11); assertEquals("direct://start", sent.getEndpoint().getEndpointUri()); } @@ -174,26 +184,30 @@ public class EventNotifierFailureHandledEventsTest extends ContextTestSupport { template.sendBody("direct:start", "Hello World"); assertMockEndpointsSatisfied(); - assertEquals(11, events.size()); + assertEquals(12, events.size()); assertIsInstanceOf(CamelContextStartingEvent.class, events.get(0)); assertIsInstanceOf(RouteAddedEvent.class, events.get(1)); assertIsInstanceOf(RouteStartedEvent.class, events.get(2)); assertIsInstanceOf(CamelContextStartedEvent.class, events.get(3)); assertIsInstanceOf(ExchangeSendingEvent.class, events.get(4)); assertIsInstanceOf(ExchangeCreatedEvent.class, events.get(5)); - assertIsInstanceOf(ExchangeSendingEvent.class, events.get(6)); - assertIsInstanceOf(ExchangeSentEvent.class, events.get(7)); - ExchangeFailureHandledEvent e = assertIsInstanceOf(ExchangeFailureHandledEvent.class, events.get(8)); + ExchangeFailureHandlingEvent e0 = assertIsInstanceOf(ExchangeFailureHandlingEvent.class, events.get(6)); + assertEquals("should NOT be DLC", false, e0.isDeadLetterChannel()); + + assertIsInstanceOf(ExchangeSendingEvent.class, events.get(7)); + assertIsInstanceOf(ExchangeSentEvent.class, events.get(8)); + + ExchangeFailureHandledEvent e = assertIsInstanceOf(ExchangeFailureHandledEvent.class, events.get(9)); assertEquals("should NOT be DLC", false, e.isDeadLetterChannel()); assertFalse("should not be marked as failure handled as it was continued instead", e.isHandled()); assertTrue("should be continued", e.isContinued()); // onException will handle the exception - assertIsInstanceOf(ExchangeCompletedEvent.class, events.get(9)); + assertIsInstanceOf(ExchangeCompletedEvent.class, events.get(10)); // and the last event should be the direct:start - assertIsInstanceOf(ExchangeSentEvent.class, events.get(10)); - ExchangeSentEvent sent = (ExchangeSentEvent) events.get(10); + assertIsInstanceOf(ExchangeSentEvent.class, events.get(11)); + ExchangeSentEvent sent = (ExchangeSentEvent) events.get(11); assertEquals("direct://start", sent.getEndpoint().getEndpointUri()); } http://git-wip-us.apache.org/repos/asf/camel/blob/3626a039/camel-core/src/test/java/org/apache/camel/management/EventNotifierRedeliveryEventsTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/management/EventNotifierRedeliveryEventsTest.java b/camel-core/src/test/java/org/apache/camel/management/EventNotifierRedeliveryEventsTest.java index 061b04e..cf146ea 100644 --- a/camel-core/src/test/java/org/apache/camel/management/EventNotifierRedeliveryEventsTest.java +++ b/camel-core/src/test/java/org/apache/camel/management/EventNotifierRedeliveryEventsTest.java @@ -27,6 +27,7 @@ import org.apache.camel.impl.DefaultCamelContext; import org.apache.camel.management.event.ExchangeCompletedEvent; import org.apache.camel.management.event.ExchangeCreatedEvent; import org.apache.camel.management.event.ExchangeFailureHandledEvent; +import org.apache.camel.management.event.ExchangeFailureHandlingEvent; import org.apache.camel.management.event.ExchangeRedeliveryEvent; import org.apache.camel.management.event.ExchangeSendingEvent; import org.apache.camel.management.event.ExchangeSentEvent; @@ -92,7 +93,7 @@ public class EventNotifierRedeliveryEventsTest extends ContextTestSupport { assertMockEndpointsSatisfied(); assertTrue(oneExchangeDone.matchesMockWaitTime()); - assertEquals(11, events.size()); + assertEquals(12, events.size()); assertIsInstanceOf(ExchangeSendingEvent.class, events.get(0)); assertIsInstanceOf(ExchangeCreatedEvent.class, events.get(1)); @@ -104,11 +105,12 @@ public class EventNotifierRedeliveryEventsTest extends ContextTestSupport { assertEquals(3, e.getAttempt()); e = assertIsInstanceOf(ExchangeRedeliveryEvent.class, events.get(5)); assertEquals(4, e.getAttempt()); - assertIsInstanceOf(ExchangeSendingEvent.class, events.get(6)); - assertIsInstanceOf(ExchangeSentEvent.class, events.get(7)); - assertIsInstanceOf(ExchangeFailureHandledEvent.class, events.get(8)); - assertIsInstanceOf(ExchangeCompletedEvent.class, events.get(9)); - assertIsInstanceOf(ExchangeSentEvent.class, events.get(10)); + assertIsInstanceOf(ExchangeFailureHandlingEvent.class, events.get(6)); + assertIsInstanceOf(ExchangeSendingEvent.class, events.get(7)); + assertIsInstanceOf(ExchangeSentEvent.class, events.get(8)); + assertIsInstanceOf(ExchangeFailureHandledEvent.class, events.get(9)); + assertIsInstanceOf(ExchangeCompletedEvent.class, events.get(10)); + assertIsInstanceOf(ExchangeSentEvent.class, events.get(11)); } public void testExchangeRedeliveryAsync() throws Exception { @@ -127,7 +129,7 @@ public class EventNotifierRedeliveryEventsTest extends ContextTestSupport { assertMockEndpointsSatisfied(); assertTrue(oneExchangeDone.matchesMockWaitTime()); - assertEquals(11, events.size()); + assertEquals(12, events.size()); assertIsInstanceOf(ExchangeSendingEvent.class, events.get(0)); assertIsInstanceOf(ExchangeCreatedEvent.class, events.get(1));