CAMEL-9795: camel-zipkin - Reuse existing span for complex eips like multicast.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/6c81334c Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/6c81334c Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/6c81334c Branch: refs/heads/master Commit: 6c81334c1f4274c4c3f1392296f985bc23e9e084 Parents: 22b7b17 Author: Claus Ibsen <davscl...@apache.org> Authored: Tue Apr 5 11:42:03 2016 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Tue Apr 5 13:20:49 2016 +0200 ---------------------------------------------------------------------- .../camel/processor/CamelInternalProcessor.java | 2 +- .../zipkin/ZipkinClientRequestAdapter.java | 7 ++ .../zipkin/ZipkinClientResponseAdaptor.java | 7 ++ .../org/apache/camel/zipkin/ZipkinHelper.java | 28 ++++++++ .../zipkin/ZipkinServerRequestAdapter.java | 6 ++ .../zipkin/ZipkinServerResponseAdapter.java | 7 ++ .../org/apache/camel/zipkin/ZipkinTracer.java | 27 ++++++-- .../zipkin/ZipkinSimpleLogStreamsRouteTest.java | 69 ++++++++++++++++++++ .../src/main/java/sample/camel/ClientRoute.java | 5 +- .../main/java/sample/camel/Service1Route.java | 10 +-- .../main/java/sample/camel/Service2Route.java | 7 +- 11 files changed, 159 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/6c81334c/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java index 7ce5a4d..d9dc7a1 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java @@ -774,7 +774,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor { @Override public void after(Exchange exchange, StreamCache sc) throws Exception { - Object body = null; + Object body; if (exchange.hasOut()) { body = exchange.getOut().getBody(); } else { http://git-wip-us.apache.org/repos/asf/camel/blob/6c81334c/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinClientRequestAdapter.java ---------------------------------------------------------------------- diff --git a/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinClientRequestAdapter.java b/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinClientRequestAdapter.java index a8f2207..ecffa7e 100644 --- a/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinClientRequestAdapter.java +++ b/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinClientRequestAdapter.java @@ -28,9 +28,12 @@ import com.github.kristofa.brave.SpanId; import com.github.kristofa.brave.internal.Nullable; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; +import org.apache.camel.StreamCache; import org.apache.camel.util.MessageHelper; import org.apache.camel.util.URISupport; +import static org.apache.camel.zipkin.ZipkinHelper.prepareBodyForLogging; + public final class ZipkinClientRequestAdapter implements ClientRequestAdapter { private final ZipkinTracer eventNotifier; @@ -82,8 +85,12 @@ public final class ZipkinClientRequestAdapter implements ClientRequestAdapter { KeyValueAnnotation key4 = null; if (eventNotifier.isIncludeMessageBody() || eventNotifier.isIncludeMessageBodyStreams()) { boolean streams = eventNotifier.isIncludeMessageBodyStreams(); + StreamCache cache = prepareBodyForLogging(exchange, streams); String body = MessageHelper.extractBodyForLogging(exchange.hasOut() ? exchange.getOut() : exchange.getIn(), "", streams, streams); key4 = KeyValueAnnotation.create("camel.client.exchange.message.request.body", body); + if (cache != null) { + cache.reset(); + } } List<KeyValueAnnotation> list = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/camel/blob/6c81334c/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinClientResponseAdaptor.java ---------------------------------------------------------------------- diff --git a/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinClientResponseAdaptor.java b/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinClientResponseAdaptor.java index ec3711d..803dfcc 100644 --- a/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinClientResponseAdaptor.java +++ b/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinClientResponseAdaptor.java @@ -24,9 +24,12 @@ import com.github.kristofa.brave.ClientResponseAdapter; import com.github.kristofa.brave.KeyValueAnnotation; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; +import org.apache.camel.StreamCache; import org.apache.camel.util.MessageHelper; import org.apache.camel.util.URISupport; +import static org.apache.camel.zipkin.ZipkinHelper.prepareBodyForLogging; + public class ZipkinClientResponseAdaptor implements ClientResponseAdapter { private final ZipkinTracer eventNotifier; @@ -50,8 +53,12 @@ public class ZipkinClientResponseAdaptor implements ClientResponseAdapter { KeyValueAnnotation key4 = null; if (eventNotifier.isIncludeMessageBody() || eventNotifier.isIncludeMessageBodyStreams()) { boolean streams = eventNotifier.isIncludeMessageBodyStreams(); + StreamCache cache = prepareBodyForLogging(exchange, streams); String body = MessageHelper.extractBodyForLogging(exchange.hasOut() ? exchange.getOut() : exchange.getIn(), "", streams, streams); key4 = KeyValueAnnotation.create("camel.client.exchange.message.response.body", body); + if (cache != null) { + cache.reset(); + } } KeyValueAnnotation key5 = null; http://git-wip-us.apache.org/repos/asf/camel/blob/6c81334c/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinHelper.java ---------------------------------------------------------------------- diff --git a/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinHelper.java b/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinHelper.java index 96d86b7..da5e568 100644 --- a/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinHelper.java +++ b/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinHelper.java @@ -18,6 +18,9 @@ package org.apache.camel.zipkin; import com.github.kristofa.brave.IdConversion; import com.github.kristofa.brave.SpanId; +import org.apache.camel.Exchange; +import org.apache.camel.Message; +import org.apache.camel.StreamCache; /** * Helper class. @@ -34,4 +37,29 @@ public final class ZipkinHelper { return SpanId.create(IdConversion.convertToLong(traceId), IdConversion.convertToLong(spanId), null); } + public static StreamCache prepareBodyForLogging(Exchange exchange, boolean streams) { + if (!streams) { + // no need to prepare if streams is not enabled + return null; + } + + Message message = exchange.hasOut() ? exchange.getOut() : exchange.getIn(); + // check if body is already cached + Object body = message.getBody(); + if (body == null) { + return null; + } else if (body instanceof StreamCache) { + StreamCache sc = (StreamCache) body; + // reset so the cache is ready to be used before processing + sc.reset(); + return sc; + } + // cache the body and if we could do that replace it as the new body + StreamCache sc = exchange.getContext().getStreamCachingStrategy().cache(exchange); + if (sc != null) { + message.setBody(sc); + } + return sc; + } + } http://git-wip-us.apache.org/repos/asf/camel/blob/6c81334c/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinServerRequestAdapter.java ---------------------------------------------------------------------- diff --git a/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinServerRequestAdapter.java b/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinServerRequestAdapter.java index 3877b5f..1a38f6b 100644 --- a/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinServerRequestAdapter.java +++ b/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinServerRequestAdapter.java @@ -27,10 +27,12 @@ import com.github.kristofa.brave.SpanId; import com.github.kristofa.brave.TraceData; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; +import org.apache.camel.StreamCache; import org.apache.camel.util.MessageHelper; import org.apache.camel.util.URISupport; import static org.apache.camel.zipkin.ZipkinHelper.createSpanId; +import static org.apache.camel.zipkin.ZipkinHelper.prepareBodyForLogging; public class ZipkinServerRequestAdapter implements ServerRequestAdapter { @@ -79,8 +81,12 @@ public class ZipkinServerRequestAdapter implements ServerRequestAdapter { KeyValueAnnotation key4 = null; if (eventNotifier.isIncludeMessageBody() || eventNotifier.isIncludeMessageBodyStreams()) { boolean streams = eventNotifier.isIncludeMessageBodyStreams(); + StreamCache cache = prepareBodyForLogging(exchange, streams); String body = MessageHelper.extractBodyForLogging(exchange.hasOut() ? exchange.getOut() : exchange.getIn(), "", streams, streams); key4 = KeyValueAnnotation.create("camel.server.exchange.message.request.body", body); + if (cache != null) { + cache.reset(); + } } List<KeyValueAnnotation> list = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/camel/blob/6c81334c/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinServerResponseAdapter.java ---------------------------------------------------------------------- diff --git a/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinServerResponseAdapter.java b/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinServerResponseAdapter.java index 31071df..3ab4fe9 100644 --- a/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinServerResponseAdapter.java +++ b/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinServerResponseAdapter.java @@ -24,9 +24,12 @@ import com.github.kristofa.brave.KeyValueAnnotation; import com.github.kristofa.brave.ServerResponseAdapter; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; +import org.apache.camel.StreamCache; import org.apache.camel.util.MessageHelper; import org.apache.camel.util.URISupport; +import static org.apache.camel.zipkin.ZipkinHelper.prepareBodyForLogging; + public class ZipkinServerResponseAdapter implements ServerResponseAdapter { private final ZipkinTracer eventNotifier; @@ -56,8 +59,12 @@ public class ZipkinServerResponseAdapter implements ServerResponseAdapter { key4 = KeyValueAnnotation.create("camel.server.exchange.failure", message); } else if (eventNotifier.isIncludeMessageBody() || eventNotifier.isIncludeMessageBodyStreams()) { boolean streams = eventNotifier.isIncludeMessageBodyStreams(); + StreamCache cache = prepareBodyForLogging(exchange, streams); String body = MessageHelper.extractBodyForLogging(exchange.hasOut() ? exchange.getOut() : exchange.getIn(), "", streams, streams); key4 = KeyValueAnnotation.create("camel.server.exchange.message.response.body", body); + if (cache != null) { + cache.reset(); + } } KeyValueAnnotation key5 = null; http://git-wip-us.apache.org/repos/asf/camel/blob/6c81334c/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinTracer.java ---------------------------------------------------------------------- diff --git a/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinTracer.java b/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinTracer.java index 53a5432..f311aaf 100644 --- a/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinTracer.java +++ b/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinTracer.java @@ -50,6 +50,7 @@ import org.apache.camel.model.RouteDefinition; import org.apache.camel.spi.RoutePolicy; import org.apache.camel.spi.RoutePolicyFactory; import org.apache.camel.support.EventNotifierSupport; +import org.apache.camel.support.SynchronizationAdapter; import org.apache.camel.util.EndpointHelper; import org.apache.camel.util.IOHelper; import org.apache.camel.util.ObjectHelper; @@ -271,6 +272,8 @@ public class ZipkinTracer extends EventNotifierSupport implements RoutePolicy, R /** * Whether to include message bodies that are stream based in the zipkin traces. * <p/> + * This requires enabling <a href="http://camel.apache.org/stream-caching.html">stream caching</a> on the routes or globally on the CamelContext. + * <p/> * This is not recommended for production usage, or when having big payloads. You can limit the size by * configuring the <a href="http://camel.apache.org/how-do-i-set-the-max-chars-when-debug-logging-messages-in-camel.html">max debug log size</a>. */ @@ -686,15 +689,29 @@ public class ZipkinTracer extends EventNotifierSupport implements RoutePolicy, R serverRequest(brave, serviceName, exchange); } } + + // add on completion after the route is done, but before the consumer writes the response + // this allows us to track the zipkin event before returning the response which is the right time + exchange.addOnCompletion(new SynchronizationAdapter() { + @Override + public void onAfterRoute(Route route, Exchange exchange) { + String serviceName = getServiceName(exchange, route.getEndpoint(), true, false); + Brave brave = getBrave(serviceName); + if (brave != null) { + serverResponse(brave, serviceName, exchange); + } + } + + @Override + public String toString() { + return "ZipkinTracerOnCompletion"; + } + }); } @Override public void onExchangeDone(Route route, Exchange exchange) { - String serviceName = getServiceName(exchange, route.getEndpoint(), true, false); - Brave brave = getBrave(serviceName); - if (brave != null) { - serverResponse(brave, serviceName, exchange); - } + // noop } @Override http://git-wip-us.apache.org/repos/asf/camel/blob/6c81334c/components/camel-zipkin/src/test/java/org/apache/camel/zipkin/ZipkinSimpleLogStreamsRouteTest.java ---------------------------------------------------------------------- diff --git a/components/camel-zipkin/src/test/java/org/apache/camel/zipkin/ZipkinSimpleLogStreamsRouteTest.java b/components/camel-zipkin/src/test/java/org/apache/camel/zipkin/ZipkinSimpleLogStreamsRouteTest.java new file mode 100644 index 0000000..4eab008 --- /dev/null +++ b/components/camel-zipkin/src/test/java/org/apache/camel/zipkin/ZipkinSimpleLogStreamsRouteTest.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.zipkin; + +import java.util.concurrent.TimeUnit; + +import org.apache.camel.CamelContext; +import org.apache.camel.RoutesBuilder; +import org.apache.camel.builder.NotifyBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +public class ZipkinSimpleLogStreamsRouteTest extends CamelTestSupport { + + private ZipkinTracer zipkin; + + @Override + protected CamelContext createCamelContext() throws Exception { + CamelContext context = super.createCamelContext(); + + zipkin = new ZipkinTracer(); + zipkin.setServiceName("dude"); + zipkin.setIncludeMessageBodyStreams(true); + zipkin.setSpanCollector(new ZipkinLoggingSpanCollector()); + + // attaching ourself to CamelContext + zipkin.init(context); + + return context; + } + + @Test + public void testZipkinRoute() throws Exception { + NotifyBuilder notify = new NotifyBuilder(context).whenDone(5).create(); + + for (int i = 0; i < 5; i++) { + template.sendBody("seda:dude", "Hello World"); + } + + assertTrue(notify.matches(30, TimeUnit.SECONDS)); + } + + @Override + protected RoutesBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("seda:dude").routeId("dude") + .log("routing at ${routeId}") + .delay(simple("${random(1000,2000)}")); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/6c81334c/examples/camel-example-zipkin/client/src/main/java/sample/camel/ClientRoute.java ---------------------------------------------------------------------- diff --git a/examples/camel-example-zipkin/client/src/main/java/sample/camel/ClientRoute.java b/examples/camel-example-zipkin/client/src/main/java/sample/camel/ClientRoute.java index 1ba7ad4..ece56de 100644 --- a/examples/camel-example-zipkin/client/src/main/java/sample/camel/ClientRoute.java +++ b/examples/camel-example-zipkin/client/src/main/java/sample/camel/ClientRoute.java @@ -23,10 +23,11 @@ public class ClientRoute extends RouteBuilder { @Override public void configure() { // you can configure the route rule with Java DSL here - from("timer:trigger?exchangePattern=InOut&period=30s") + from("timer:trigger?exchangePattern=InOut&period=30s").streamCaching() .bean("counterBean") + .log(" Client request: ${body}") .to("http://localhost:9090/service1") - .log("Result: ${body}"); + .log("Client response: ${body}"); } } http://git-wip-us.apache.org/repos/asf/camel/blob/6c81334c/examples/camel-example-zipkin/service1/src/main/java/sample/camel/Service1Route.java ---------------------------------------------------------------------- diff --git a/examples/camel-example-zipkin/service1/src/main/java/sample/camel/Service1Route.java b/examples/camel-example-zipkin/service1/src/main/java/sample/camel/Service1Route.java index f8789c5..598f50e 100644 --- a/examples/camel-example-zipkin/service1/src/main/java/sample/camel/Service1Route.java +++ b/examples/camel-example-zipkin/service1/src/main/java/sample/camel/Service1Route.java @@ -24,13 +24,13 @@ public class Service1Route extends RouteBuilder { @Override public void configure() throws Exception { - from("jetty:http://0.0.0.0:{{service1.port}}/service1").routeId("service1") + from("jetty:http://0.0.0.0:{{service1.port}}/service1").routeId("service1").streamCaching() .removeHeaders("CamelHttp*") - .convertBodyTo(String.class) + .log("Service1 request: ${body}") .delay(simple("${random(1000,2000)}")) - .transform(simple("Service1: ${body}")) - .to("http://0.0.0.0:{{service2.port}}/service2"); - + .transform(simple("Service1-${body}")) + .to("http://0.0.0.0:{{service2.port}}/service2") + .log("Service1 response: ${body}"); } } http://git-wip-us.apache.org/repos/asf/camel/blob/6c81334c/examples/camel-example-zipkin/service2/src/main/java/sample/camel/Service2Route.java ---------------------------------------------------------------------- diff --git a/examples/camel-example-zipkin/service2/src/main/java/sample/camel/Service2Route.java b/examples/camel-example-zipkin/service2/src/main/java/sample/camel/Service2Route.java index edc5146..7174b9e 100644 --- a/examples/camel-example-zipkin/service2/src/main/java/sample/camel/Service2Route.java +++ b/examples/camel-example-zipkin/service2/src/main/java/sample/camel/Service2Route.java @@ -37,10 +37,11 @@ public class Service2Route extends RouteBuilder { // add zipkin to CamelContext zipkin.init(getContext()); - from("undertow:http://0.0.0.0:7070/service2").routeId("service2") - .convertBodyTo(String.class) + from("undertow:http://0.0.0.0:7070/service2").routeId("service2").streamCaching() + .log(" Service2 request: ${body}") .delay(simple("${random(1000,2000)}")) - .transform(simple("Service2: ${body}")); + .transform(simple("Service2-${body}")) + .log("Service2 response: ${body}"); } }