Repository: camel Updated Branches: refs/heads/master d1c3bbfe5 -> f8ef4eb1a
CAMEL-9759: camel-zipkin - Instrument Camel. Work in progress. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/b48915a7 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/b48915a7 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/b48915a7 Branch: refs/heads/master Commit: b48915a71d80dc45f797ea4590390668408cbff8 Parents: 4b1737a Author: Claus Ibsen <davscl...@apache.org> Authored: Tue Mar 29 14:15:54 2016 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Tue Mar 29 14:41:54 2016 +0200 ---------------------------------------------------------------------- .../camel/zipkin/ZipkinEventNotifier.java | 109 ++++++++++++++++--- .../zipkin/ZipkinSimpleFallbackRouteTest.java | 68 ++++++++++++ .../scribe/ZipkinOneRouteFallbackScribe.java | 74 +++++++++++++ 3 files changed, 238 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/b48915a7/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinEventNotifier.java ---------------------------------------------------------------------- diff --git a/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinEventNotifier.java b/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinEventNotifier.java index f4bf0b0..5dd38e7 100644 --- a/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinEventNotifier.java +++ b/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinEventNotifier.java @@ -19,7 +19,9 @@ package org.apache.camel.zipkin; import java.io.Closeable; import java.util.EventObject; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import com.github.kristofa.brave.Brave; import com.github.kristofa.brave.ClientSpanThreadBinder; @@ -59,7 +61,11 @@ import static org.apache.camel.builder.ExpressionBuilder.routeIdExpression; * For both kinds you can use wildcards and regular expressions to match, which is using the rules from * {@link EndpointHelper#matchPattern(String, String)} and {@link EndpointHelper#matchEndpoint(CamelContext, String, String)} * <p/> - * At least one mapping must be configured, you can use <tt>*</tt> to match all incoming and outgoing messages. + * To match all Camel messages you can use <tt>*</tt> in the pattern and configure that to the same service name. + * <br/> + * If no mapping has been configured then Camel will fallback and use endpoint uri's as service names. + * However its recommended to configure service mappings so you can use human logic names instead of Camel + * endpoint uris in the names. */ @ManagedResource(description = "Managing ZipkinEventNotifier") public class ZipkinEventNotifier extends EventNotifierSupport implements StatefulService { @@ -67,8 +73,10 @@ public class ZipkinEventNotifier extends EventNotifierSupport implements Statefu private float rate = 1.0f; private SpanCollector spanCollector; private Map<String, String> serviceMappings = new HashMap<>(); + private Set<String> excludePatterns = new HashSet<>(); private Map<String, Brave> braves = new HashMap<>(); private boolean includeMessageBody; + private boolean useFallbackServiceNames; public ZipkinEventNotifier() { } @@ -128,6 +136,21 @@ public class ZipkinEventNotifier extends EventNotifierSupport implements Statefu serviceMappings.put(pattern, serviceName); } + public Set<String> getExcludePatterns() { + return excludePatterns; + } + + public void setExcludePatterns(Set<String> excludePatterns) { + this.excludePatterns = excludePatterns; + } + + /** + * Adds an exclude pattern that will disable tracing with zipkin for Camel messages that matches the pattern. + */ + public void addExcludePattern(String pattern) { + excludePatterns.add(pattern); + } + @ManagedAttribute(description = "Whether to include the Camel message body in the zipkin traces") public boolean isIncludeMessageBody() { return includeMessageBody; @@ -149,7 +172,8 @@ public class ZipkinEventNotifier extends EventNotifierSupport implements Statefu super.doStart(); if (serviceMappings.isEmpty()) { - throw new IllegalStateException("At least one service name must be configured"); + log.warn("No service name(s) has been configured. Camel will fallback and use endpoint uris as service names."); + useFallbackServiceNames = true; } // create braves mapped per service name @@ -196,9 +220,15 @@ public class ZipkinEventNotifier extends EventNotifierSupport implements Statefu String id = routeIdExpression().evaluate(exchange, String.class); if (id != null) { + // exclude patterns take precedence + for (String pattern : excludePatterns) { + if (EndpointHelper.matchPattern(id, pattern)) { + return null; + } + } for (Map.Entry<String, String> entry : serviceMappings.entrySet()) { String pattern = entry.getKey(); - if (EndpointHelper.matchPattern(pattern, id)) { + if (EndpointHelper.matchPattern(id, pattern)) { answer = entry.getValue(); break; } @@ -208,9 +238,15 @@ public class ZipkinEventNotifier extends EventNotifierSupport implements Statefu if (answer == null) { id = exchange.getFromRouteId(); if (id != null) { + // exclude patterns take precedence + for (String pattern : excludePatterns) { + if (EndpointHelper.matchPattern(id, pattern)) { + return null; + } + } for (Map.Entry<String, String> entry : serviceMappings.entrySet()) { String pattern = entry.getKey(); - if (EndpointHelper.matchPattern(pattern, id)) { + if (EndpointHelper.matchPattern(id, pattern)) { answer = entry.getValue(); break; } @@ -221,6 +257,12 @@ public class ZipkinEventNotifier extends EventNotifierSupport implements Statefu if (answer == null && endpoint != null) { String url = endpoint.getEndpointUri(); if (url != null) { + // exclude patterns take precedence + for (String pattern : excludePatterns) { + if (EndpointHelper.matchPattern(url, pattern)) { + return null; + } + } for (Map.Entry<String, String> entry : serviceMappings.entrySet()) { String pattern = entry.getKey(); if (EndpointHelper.matchEndpoint(exchange.getContext(), url, pattern)) { @@ -234,6 +276,12 @@ public class ZipkinEventNotifier extends EventNotifierSupport implements Statefu if (answer == null && exchange.getFromEndpoint() != null) { String url = exchange.getFromEndpoint().getEndpointUri(); if (url != null) { + // exclude patterns take precedence + for (String pattern : excludePatterns) { + if (EndpointHelper.matchPattern(url, pattern)) { + return null; + } + } for (Map.Entry<String, String> entry : serviceMappings.entrySet()) { String pattern = entry.getKey(); if (EndpointHelper.matchEndpoint(exchange.getContext(), url, pattern)) { @@ -244,15 +292,49 @@ public class ZipkinEventNotifier extends EventNotifierSupport implements Statefu } } - return answer; + if (answer == null && useFallbackServiceNames) { + String key = null; + if (endpoint != null) { + key = endpoint.getEndpointKey(); + } else if (exchange.getFromEndpoint() != null) { + key = exchange.getFromEndpoint().getEndpointKey(); + } + // exclude patterns take precedence + for (String pattern : excludePatterns) { + if (EndpointHelper.matchPattern(key, pattern)) { + return null; + } + } + if (log.isTraceEnabled() && key != null) { + log.trace("Using serviceName: {} as fallback", key); + } + return key; + } else { + if (log.isTraceEnabled() && answer != null) { + log.trace("Using serviceName: {}", answer); + } + return answer; + } } private Brave getBrave(String serviceName) { + Brave brave = null; if (serviceName != null) { - return braves.get(serviceName); - } else { - return null; + brave = braves.get(serviceName); + + if (brave == null && useFallbackServiceNames) { + log.debug("Creating Brave assigned to serviceName: {}", serviceName + " as fallback"); + Brave.Builder builder = new Brave.Builder(serviceName); + builder = builder.traceSampler(Sampler.create(rate)); + if (spanCollector != null) { + builder = builder.spanCollector(spanCollector); + } + brave = builder.build(); + braves.put(serviceName, brave); + } } + + return brave; } @Override @@ -304,7 +386,7 @@ public class ZipkinEventNotifier extends EventNotifierSupport implements Statefu event.getExchange().setProperty(key, span); if (log.isDebugEnabled()) { - log.debug("clientRequest[service={}, spanId={}]", serviceName, span != null ? span.getId() : "<null>"); + log.debug("clientRequest\t[service={}, spanId={}]", serviceName, span != null ? span.getId() : "<null>"); } } @@ -317,7 +399,8 @@ public class ZipkinEventNotifier extends EventNotifierSupport implements Statefu binder.setCurrentSpan(null); if (log.isDebugEnabled()) { - log.debug("clientResponse[service={}, spanId={}]", serviceName, span != null ? span.getId() : "<null>"); + // one space to align client vs server in the logs + log.debug("clientResponse\t[service={}, spanId={}]", serviceName, span != null ? span.getId() : "<null>"); } } @@ -329,7 +412,7 @@ public class ZipkinEventNotifier extends EventNotifierSupport implements Statefu event.getExchange().setProperty(key, span); if (log.isDebugEnabled()) { - log.debug("serverRequest[service={}, spanId={}]", serviceName, span != null ? span.getSpan().getId() : "<null>"); + log.debug("serverRequest\t[service={}, spanId={}]", serviceName, span != null ? span.getSpan().getId() : "<null>"); } } @@ -342,7 +425,7 @@ public class ZipkinEventNotifier extends EventNotifierSupport implements Statefu binder.setCurrentSpan(null); if (log.isDebugEnabled()) { - log.debug("serverResponse[service={}, spanId={}, status=exchangeCompleted]", serviceName, span != null ? span.getSpan().getId() : "<null>"); + log.debug("serverResponse\t[service={}, spanId={}]\t[status=exchangeCompleted]", serviceName, span != null ? span.getSpan().getId() : "<null>"); } } @@ -355,7 +438,7 @@ public class ZipkinEventNotifier extends EventNotifierSupport implements Statefu binder.setCurrentSpan(null); if (log.isDebugEnabled()) { - log.debug("serverResponse[service={}, spanId={}, status=exchangeFailed]", serviceName, span != null ? span.getSpan().getId() : "<null>"); + log.debug("serverResponse[service={}, spanId={}]\t[status=exchangeFailed]", serviceName, span != null ? span.getSpan().getId() : "<null>"); } } http://git-wip-us.apache.org/repos/asf/camel/blob/b48915a7/components/camel-zipkin/src/test/java/org/apache/camel/zipkin/ZipkinSimpleFallbackRouteTest.java ---------------------------------------------------------------------- diff --git a/components/camel-zipkin/src/test/java/org/apache/camel/zipkin/ZipkinSimpleFallbackRouteTest.java b/components/camel-zipkin/src/test/java/org/apache/camel/zipkin/ZipkinSimpleFallbackRouteTest.java new file mode 100644 index 0000000..5fa940a --- /dev/null +++ b/components/camel-zipkin/src/test/java/org/apache/camel/zipkin/ZipkinSimpleFallbackRouteTest.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.zipkin; + +import java.util.concurrent.TimeUnit; + +import org.apache.camel.CamelContext; +import org.apache.camel.RoutesBuilder; +import org.apache.camel.builder.NotifyBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +public class ZipkinSimpleFallbackRouteTest extends CamelTestSupport { + + private ZipkinEventNotifier zipkin; + + @Override + protected CamelContext createCamelContext() throws Exception { + CamelContext context = super.createCamelContext(); + + zipkin = new ZipkinEventNotifier(); + // no service so should use fallback naming style + // we do not want to trace any direct endpoints + zipkin.addExcludePattern("direct:*"); + zipkin.setSpanCollector(new ZipkinLoggingSpanCollector()); + context.getManagementStrategy().addEventNotifier(zipkin); + + return context; + } + + @Test + public void testZipkinRoute() throws Exception { + NotifyBuilder notify = new NotifyBuilder(context).whenDone(5).create(); + + for (int i = 0; i < 5; i++) { + template.sendBody("seda:dude", "Hello World"); + } + + assertTrue(notify.matches(30, TimeUnit.SECONDS)); + } + + @Override + protected RoutesBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("seda:dude").routeId("dude") + .log("routing at ${routeId}") + .delay(simple("${random(1000,2000)}")); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/b48915a7/components/camel-zipkin/src/test/java/org/apache/camel/zipkin/scribe/ZipkinOneRouteFallbackScribe.java ---------------------------------------------------------------------- diff --git a/components/camel-zipkin/src/test/java/org/apache/camel/zipkin/scribe/ZipkinOneRouteFallbackScribe.java b/components/camel-zipkin/src/test/java/org/apache/camel/zipkin/scribe/ZipkinOneRouteFallbackScribe.java new file mode 100644 index 0000000..8f3ed4e --- /dev/null +++ b/components/camel-zipkin/src/test/java/org/apache/camel/zipkin/scribe/ZipkinOneRouteFallbackScribe.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.zipkin.scribe; + +import com.github.kristofa.brave.scribe.ScribeSpanCollector; +import org.apache.camel.CamelContext; +import org.apache.camel.RoutesBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.apache.camel.zipkin.ZipkinEventNotifier; +import org.junit.Test; + +/** + * Integration test requires running Zipkin/Scribe running + * + * The easiest way is to run using zipkin-docker: https://github.com/openzipkin/docker-zipkin + * + * Adjust the IP address to what IP docker-machines have assigned, you can use + * <tt>docker-machines ls</tt> + */ +public class ZipkinOneRouteFallbackScribe extends CamelTestSupport { + + private String ip = "192.168.99.100"; + private ZipkinEventNotifier zipkin; + + @Override + protected CamelContext createCamelContext() throws Exception { + CamelContext context = super.createCamelContext(); + + zipkin = new ZipkinEventNotifier(); + // no service so should use fallback naming style + // we do not want to trace any direct endpoints + zipkin.addExcludePattern("direct:*"); + zipkin.setIncludeMessageBody(true); + zipkin.setSpanCollector(new ScribeSpanCollector(ip, 9410)); + context.getManagementStrategy().addEventNotifier(zipkin); + + return context; + } + + @Test + public void testZipkinRoute() throws Exception { + template.requestBody("direct:start", "Hello Goofy"); + template.requestBody("direct:start", "Hello again Goofy"); + } + + @Override + protected RoutesBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start").to("seda:goofy"); + + from("seda:goofy").routeId("goofy") + .log("routing at ${routeId}") + .delay(simple("${random(1000,2000)}")); + } + }; + } +}