Repository: camel Updated Branches: refs/heads/camel-2.15.x d408674e4 -> 22766a51c refs/heads/master 38fd41c9e -> bd3962033
CAMEL-8634: Wire tap - Should emit event notification about sending to tapped endpoint Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/bd396203 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/bd396203 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/bd396203 Branch: refs/heads/master Commit: bd3962033df1f6a46e1bfe3820faadd7e4d9c49f Parents: 38fd41c Author: Claus Ibsen <davscl...@apache.org> Authored: Tue Apr 14 16:25:08 2015 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Tue Apr 14 16:25:17 2015 +0200 ---------------------------------------------------------------------- .../camel/processor/WireTapProcessor.java | 12 +++++-- .../EventNotifierExchangeSentParallelTest.java | 2 ++ .../EventNotifierExchangeSentTest.java | 38 ++++++++++++++++++++ 3 files changed, 50 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/bd396203/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java index 532608d..a74e663 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java @@ -37,9 +37,11 @@ import org.apache.camel.impl.DefaultExchange; import org.apache.camel.spi.IdAware; import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.AsyncProcessorHelper; +import org.apache.camel.util.EventHelper; import org.apache.camel.util.ExchangeHelper; import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.ServiceHelper; +import org.apache.camel.util.StopWatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -100,7 +102,7 @@ public class WireTapProcessor extends ServiceSupport implements AsyncProcessor, AsyncProcessorHelper.process(this, exchange); } - public boolean process(Exchange exchange, final AsyncCallback callback) { + public boolean process(final Exchange exchange, final AsyncCallback callback) { if (!isStarted()) { throw new IllegalStateException("WireTapProcessor has not been started: " + this); } @@ -120,14 +122,20 @@ public class WireTapProcessor extends ServiceSupport implements AsyncProcessor, // send the exchange to the destination using an executor service executorService.submit(new Callable<Exchange>() { public Exchange call() throws Exception { + final StopWatch watch = new StopWatch(); try { + EventHelper.notifyExchangeSending(wireTapExchange.getContext(), wireTapExchange, destination); LOG.debug(">>>> (wiretap) {} {}", destination, wireTapExchange); processor.process(wireTapExchange); } catch (Throwable e) { LOG.warn("Error occurred during processing " + wireTapExchange + " wiretap to " + destination + ". This exception will be ignored.", e); + } finally { + // emit event that the exchange was sent to the endpoint + long timeTaken = watch.stop(); + EventHelper.notifyExchangeSent(wireTapExchange.getContext(), wireTapExchange, destination, timeTaken); } return wireTapExchange; - }; + } }); // continue routing this synchronously http://git-wip-us.apache.org/repos/asf/camel/blob/bd396203/camel-core/src/test/java/org/apache/camel/management/EventNotifierExchangeSentParallelTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/management/EventNotifierExchangeSentParallelTest.java b/camel-core/src/test/java/org/apache/camel/management/EventNotifierExchangeSentParallelTest.java index a84ec1a..6131bda 100644 --- a/camel-core/src/test/java/org/apache/camel/management/EventNotifierExchangeSentParallelTest.java +++ b/camel-core/src/test/java/org/apache/camel/management/EventNotifierExchangeSentParallelTest.java @@ -67,6 +67,8 @@ public class EventNotifierExchangeSentParallelTest extends EventNotifierExchange from("direct:foo").recipientList(header("foo")).parallelProcessing(); from("direct:cool").delay(1000); + + from("direct:tap").wireTap("log:foo").to("mock:result"); } }; } http://git-wip-us.apache.org/repos/asf/camel/blob/bd396203/camel-core/src/test/java/org/apache/camel/management/EventNotifierExchangeSentTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/management/EventNotifierExchangeSentTest.java b/camel-core/src/test/java/org/apache/camel/management/EventNotifierExchangeSentTest.java index 888c2f7..8428258 100644 --- a/camel-core/src/test/java/org/apache/camel/management/EventNotifierExchangeSentTest.java +++ b/camel-core/src/test/java/org/apache/camel/management/EventNotifierExchangeSentTest.java @@ -145,6 +145,42 @@ public class EventNotifierExchangeSentTest extends ContextTestSupport { assertEquals("direct://foo", e11.getEndpoint().getEndpointUri()); } + public void testExchangeWireTap() throws Exception { + getMockEndpoint("mock:result").expectedMessageCount(1); + + template.sendBody("direct:tap", "Hello World"); + + assertMockEndpointsSatisfied(); + + // give it time to complete + Thread.sleep(200); + + assertEquals(6, events.size()); + + // we should find log:foo which we tapped + // which runs async so they can be in random order + boolean found = false; + boolean found2 = false; + for (EventObject event : events) { + if (event instanceof ExchangeSendingEvent) { + ExchangeSendingEvent sending = (ExchangeSendingEvent) event; + String uri = sending.getEndpoint().getEndpointUri(); + if ("log://foo".equals(uri)) { + found = true; + } + } else if (event instanceof ExchangeSentEvent) { + ExchangeSentEvent sent = (ExchangeSentEvent) event; + String uri = sent.getEndpoint().getEndpointUri(); + if ("log://foo".equals(uri)) { + found2 = true; + } + } + } + + assertTrue("We should find log:foo being wire tapped", found); + assertTrue("We should find log:foo being wire tapped", found2); + } + @Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @@ -157,6 +193,8 @@ public class EventNotifierExchangeSentTest extends ContextTestSupport { from("direct:foo").recipientList().header("foo"); from("direct:cool").delay(1000); + + from("direct:tap").wireTap("log:foo").to("mock:result"); } }; }