Repository: camel Updated Branches: refs/heads/master 41805bb84 -> 08aea81cc
CAMEL-9795: camel-zipkin - Reuse existing span for complex eips like multicast. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/08aea81c Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/08aea81c Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/08aea81c Branch: refs/heads/master Commit: 08aea81cca31d3564e93a109c91abf51264667b8 Parents: 41805bb Author: Claus Ibsen <davscl...@apache.org> Authored: Tue Apr 5 13:47:34 2016 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Tue Apr 5 13:47:34 2016 +0200 ---------------------------------------------------------------------- .../camel/management/BacklogTracerTest.java | 2 +- .../org/apache/camel/zipkin/ZipkinTracer.java | 235 +++++++++---------- .../zipkin/ManagedZipkinSimpleRouteTest.java | 93 ++++++++ .../src/test/resources/log4j.properties | 1 + 4 files changed, 208 insertions(+), 123 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/08aea81c/camel-core/src/test/java/org/apache/camel/management/BacklogTracerTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/management/BacklogTracerTest.java b/camel-core/src/test/java/org/apache/camel/management/BacklogTracerTest.java index 56d039c..47fa310 100644 --- a/camel-core/src/test/java/org/apache/camel/management/BacklogTracerTest.java +++ b/camel-core/src/test/java/org/apache/camel/management/BacklogTracerTest.java @@ -40,7 +40,7 @@ public class BacklogTracerTest extends ManagementTestSupport { MBeanServer mbeanServer = getMBeanServer(); ObjectName on = new ObjectName("org.apache.camel:context=camel-1,type=tracer,name=BacklogTracer"); assertNotNull(on); - mbeanServer.isRegistered(on); + assertTrue(mbeanServer.isRegistered(on)); Boolean enabled = (Boolean) mbeanServer.getAttribute(on, "Enabled"); assertEquals("Should not be enabled", Boolean.FALSE, enabled); http://git-wip-us.apache.org/repos/asf/camel/blob/08aea81c/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinTracer.java ---------------------------------------------------------------------- diff --git a/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinTracer.java b/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinTracer.java index f311aaf..3081285 100644 --- a/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinTracer.java +++ b/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinTracer.java @@ -36,7 +36,7 @@ import org.apache.camel.CamelContextAware; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.Route; -import org.apache.camel.StatefulService; +import org.apache.camel.StaticService; import org.apache.camel.api.management.ManagedAttribute; import org.apache.camel.api.management.ManagedResource; import org.apache.camel.component.properties.ServiceHostPropertiesFunction; @@ -50,16 +50,20 @@ import org.apache.camel.model.RouteDefinition; import org.apache.camel.spi.RoutePolicy; import org.apache.camel.spi.RoutePolicyFactory; import org.apache.camel.support.EventNotifierSupport; +import org.apache.camel.support.RoutePolicySupport; +import org.apache.camel.support.ServiceSupport; import org.apache.camel.support.SynchronizationAdapter; import org.apache.camel.util.EndpointHelper; import org.apache.camel.util.IOHelper; import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.ServiceHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static org.apache.camel.builder.ExpressionBuilder.routeIdExpression; /** - * To use zipkin with Camel then setup this {@link ZipkinTracer} in your Camel application. + * To use Zipkin with Camel then setup this {@link ZipkinTracer} in your Camel application. * <p/> * Events (span) are captured for incoming and outgoing messages being sent to/from Camel. * This means you need to configure which which Camel endpoints that maps to zipkin service names. @@ -89,8 +93,10 @@ import static org.apache.camel.builder.ExpressionBuilder.routeIdExpression; * if the {@link Exchange} sends messages, then we track them using the {@link org.apache.camel.spi.EventNotifier}. */ @ManagedResource(description = "ZipkinTracer") -public class ZipkinTracer extends EventNotifierSupport implements RoutePolicy, RoutePolicyFactory, StatefulService, CamelContextAware { +public class ZipkinTracer extends ServiceSupport implements RoutePolicyFactory, StaticService, CamelContextAware { + private static final Logger LOG = LoggerFactory.getLogger(ZipkinTracer.class); + private final ZipkinEventNotifier eventNotifier = new ZipkinEventNotifier(); private final Map<String, Brave> braves = new HashMap<>(); private transient boolean useFallbackServiceNames; @@ -108,15 +114,22 @@ public class ZipkinTracer extends EventNotifierSupport implements RoutePolicy, R public ZipkinTracer() { } + + @Override + public RoutePolicy createRoutePolicy(CamelContext camelContext, String routeId, RouteDefinition route) { + return new ZipkinRoutePolicy(routeId); + } + /** * Registers this {@link ZipkinTracer} on the {@link CamelContext}. */ public void init(CamelContext camelContext) { - if (!camelContext.getManagementStrategy().getEventNotifiers().contains(this)) { - camelContext.getManagementStrategy().addEventNotifier(this); - } - if (!camelContext.getRoutePolicyFactories().contains(this)) { - camelContext.addRoutePolicyFactory(this); + if (!camelContext.hasService(this)) { + try { + camelContext.addService(this, true); + } catch (Exception e) { + throw ObjectHelper.wrapRuntimeCamelException(e); + } } } @@ -284,20 +297,23 @@ public class ZipkinTracer extends EventNotifierSupport implements RoutePolicy, R @Override protected void doStart() throws Exception { - super.doStart(); - ObjectHelper.notNull(camelContext, "CamelContext", this); + camelContext.getManagementStrategy().addEventNotifier(eventNotifier); + if (!camelContext.getRoutePolicyFactories().contains(this)) { + camelContext.addRoutePolicyFactory(this); + } + if (spanCollector == null) { if (hostName != null && port > 0) { - log.info("Configuring Zipkin ScribeSpanCollector using host: {} and port: {}", hostName, port); + LOG.info("Configuring Zipkin ScribeSpanCollector using host: {} and port: {}", hostName, port); spanCollector = new ScribeSpanCollector(hostName, port); } else { // is there a zipkin service setup as ENV variable to auto register a scribe span collector String host = new ServiceHostPropertiesFunction().apply("zipkin-collector"); String port = new ServicePortPropertiesFunction().apply("zipkin-collector"); if (ObjectHelper.isNotEmpty(host) && ObjectHelper.isNotEmpty(port)) { - log.info("Auto-configuring Zipkin ScribeSpanCollector using host: {} and port: {}", host, port); + LOG.info("Auto-configuring Zipkin ScribeSpanCollector using host: {} and port: {}", host, port); int num = camelContext.getTypeConverter().mandatoryConvertTo(Integer.class, port); spanCollector = new ScribeSpanCollector(host, num); } @@ -307,7 +323,7 @@ public class ZipkinTracer extends EventNotifierSupport implements RoutePolicy, R ObjectHelper.notNull(spanCollector, "SpanCollector", this); if (clientServiceMappings.isEmpty() && serverServiceMappings.isEmpty()) { - log.warn("No service name(s) has been mapped in clientServiceMappings or serverServiceMappings. Camel will fallback and use endpoint uris as service names."); + LOG.warn("No service name(s) has been mapped in clientServiceMappings or serverServiceMappings. Camel will fallback and use endpoint uris as service names."); useFallbackServiceNames = true; } @@ -328,8 +344,6 @@ public class ZipkinTracer extends EventNotifierSupport implements RoutePolicy, R @Override protected void doStop() throws Exception { - super.doStop(); - // stop and close collector ServiceHelper.stopAndShutdownService(spanCollector); if (spanCollector instanceof Closeable) { @@ -338,19 +352,10 @@ public class ZipkinTracer extends EventNotifierSupport implements RoutePolicy, R braves.clear(); - camelContext.getManagementStrategy().removeEventNotifier(this); + camelContext.getManagementStrategy().removeEventNotifier(eventNotifier); camelContext.getRoutePolicyFactories().remove(this); } - @Override - public boolean isEnabled(EventObject event) { - return event instanceof ExchangeSendingEvent - || event instanceof ExchangeSentEvent - || event instanceof ExchangeCreatedEvent - || event instanceof ExchangeCompletedEvent - || event instanceof ExchangeFailedEvent; - } - private String getServiceName(Exchange exchange, Endpoint endpoint, boolean server, boolean client) { if (client) { return getServiceName(exchange, endpoint, clientServiceMappings); @@ -436,13 +441,13 @@ public class ZipkinTracer extends EventNotifierSupport implements RoutePolicy, R return null; } } - if (log.isTraceEnabled() && key != null) { - log.trace("Using serviceName: {} as fallback", key); + if (LOG.isTraceEnabled() && key != null) { + LOG.trace("Using serviceName: {} as fallback", key); } return key; } else { - if (log.isTraceEnabled() && answer != null) { - log.trace("Using serviceName: {}", answer); + if (LOG.isTraceEnabled() && answer != null) { + LOG.trace("Using serviceName: {}", answer); } return answer; } @@ -467,7 +472,7 @@ public class ZipkinTracer extends EventNotifierSupport implements RoutePolicy, R brave = braves.get(serviceName); if (brave == null && useFallbackServiceNames) { - log.debug("Creating Brave assigned to serviceName: {}", serviceName + " as fallback"); + LOG.debug("Creating Brave assigned to serviceName: {}", serviceName + " as fallback"); Brave.Builder builder = new Brave.Builder(serviceName); builder = builder.traceSampler(Sampler.create(rate)); if (spanCollector != null) { @@ -481,29 +486,6 @@ public class ZipkinTracer extends EventNotifierSupport implements RoutePolicy, R return brave; } - @Override - public void notify(EventObject event) throws Exception { - // use event notifier to track events when Camel messages to endpoints - // these events corresponds to Zipkin client events - - // client events - if (event instanceof ExchangeSendingEvent) { - ExchangeSendingEvent ese = (ExchangeSendingEvent) event; - String serviceName = getServiceName(ese.getExchange(), ese.getEndpoint(), false, true); - Brave brave = getBrave(serviceName); - if (brave != null) { - clientRequest(brave, serviceName, ese); - } - } else if (event instanceof ExchangeSentEvent) { - ExchangeSentEvent ese = (ExchangeSentEvent) event; - String serviceName = getServiceName(ese.getExchange(), ese.getEndpoint(), false, true); - Brave brave = getBrave(serviceName); - if (brave != null) { - clientResponse(brave, serviceName, ese); - } - } - } - private void clientRequest(Brave brave, String serviceName, ExchangeSendingEvent event) { ClientSpanThreadBinder clientBinder = brave.clientSpanThreadBinder(); ServerSpanThreadBinder serverBinder = brave.serverSpanThreadBinder(); @@ -529,15 +511,15 @@ public class ZipkinTracer extends EventNotifierSupport implements RoutePolicy, R clientBinder.setCurrentSpan(null); serverBinder.setCurrentSpan(null); - if (span != null && log.isDebugEnabled()) { + if (span != null && LOG.isDebugEnabled()) { String traceId = "" + span.getTrace_id(); String spanId = "" + span.getId(); String parentId = span.getParent_id() != null ? "" + span.getParent_id() : null; - if (log.isDebugEnabled()) { + if (LOG.isDebugEnabled()) { if (parentId != null) { - log.debug(String.format("clientRequest [service=%s, traceId=%20s, spanId=%20s, parentId=%20s]", serviceName, traceId, spanId, parentId)); + LOG.debug(String.format("clientRequest [service=%s, traceId=%20s, spanId=%20s, parentId=%20s]", serviceName, traceId, spanId, parentId)); } else { - log.debug(String.format("clientRequest [service=%s, traceId=%20s, spanId=%20s]", serviceName, traceId, spanId)); + LOG.debug(String.format("clientRequest [service=%s, traceId=%20s, spanId=%20s]", serviceName, traceId, spanId)); } } } @@ -558,15 +540,15 @@ public class ZipkinTracer extends EventNotifierSupport implements RoutePolicy, R // and reset binder clientBinder.setCurrentSpan(null); - if (log.isDebugEnabled()) { + if (LOG.isDebugEnabled()) { String traceId = "" + span.getTrace_id(); String spanId = "" + span.getId(); String parentId = span.getParent_id() != null ? "" + span.getParent_id() : null; - if (log.isDebugEnabled()) { + if (LOG.isDebugEnabled()) { if (parentId != null) { - log.debug(String.format("clientResponse[service=%s, traceId=%20s, spanId=%20s, parentId=%20s]", serviceName, traceId, spanId, parentId)); + LOG.debug(String.format("clientResponse[service=%s, traceId=%20s, spanId=%20s, parentId=%20s]", serviceName, traceId, spanId, parentId)); } else { - log.debug(String.format("clientResponse[service=%s, traceId=%20s, spanId=%20s]", serviceName, traceId, spanId)); + LOG.debug(String.format("clientResponse[service=%s, traceId=%20s, spanId=%20s]", serviceName, traceId, spanId)); } } } @@ -596,15 +578,15 @@ public class ZipkinTracer extends EventNotifierSupport implements RoutePolicy, R // and reset binder serverBinder.setCurrentSpan(null); - if (span != null && span.getSpan() != null && log.isDebugEnabled()) { + if (span != null && span.getSpan() != null && LOG.isDebugEnabled()) { String traceId = "" + span.getSpan().getTrace_id(); String spanId = "" + span.getSpan().getId(); String parentId = span.getSpan().getParent_id() != null ? "" + span.getSpan().getParent_id() : null; - if (log.isDebugEnabled()) { + if (LOG.isDebugEnabled()) { if (parentId != null) { - log.debug(String.format("serverRequest [service=%s, traceId=%20s, spanId=%20s, parentId=%20s]", serviceName, traceId, spanId, parentId)); + LOG.debug(String.format("serverRequest [service=%s, traceId=%20s, spanId=%20s, parentId=%20s]", serviceName, traceId, spanId, parentId)); } else { - log.debug(String.format("serverRequest [service=%s, traceId=%20s, spanId=%20s]", serviceName, traceId, spanId)); + LOG.debug(String.format("serverRequest [service=%s, traceId=%20s, spanId=%20s]", serviceName, traceId, spanId)); } } } @@ -627,15 +609,15 @@ public class ZipkinTracer extends EventNotifierSupport implements RoutePolicy, R // and reset binder serverBinder.setCurrentSpan(null); - if (span.getSpan() != null && log.isDebugEnabled()) { + if (span.getSpan() != null && LOG.isDebugEnabled()) { String traceId = "" + span.getSpan().getTrace_id(); String spanId = "" + span.getSpan().getId(); String parentId = span.getSpan().getParent_id() != null ? "" + span.getSpan().getParent_id() : null; - if (log.isDebugEnabled()) { + if (LOG.isDebugEnabled()) { if (parentId != null) { - log.debug(String.format("serverResponse[service=%s, traceId=%20s, spanId=%20s, parentId=%20s]", serviceName, traceId, spanId, parentId)); + LOG.debug(String.format("serverResponse[service=%s, traceId=%20s, spanId=%20s, parentId=%20s]", serviceName, traceId, spanId, parentId)); } else { - log.debug(String.format("serverResponse[service=%s, traceId=%20s, spanId=%20s]", serviceName, traceId, spanId)); + LOG.debug(String.format("serverResponse[service=%s, traceId=%20s, spanId=%20s]", serviceName, traceId, spanId)); } } } @@ -647,75 +629,84 @@ public class ZipkinTracer extends EventNotifierSupport implements RoutePolicy, R return exchange.getIn().getHeader(ZipkinConstants.TRACE_ID) != null; } - @Override - public void onInit(Route route) { - // noop - } + private final class ZipkinEventNotifier extends EventNotifierSupport { - @Override - public void onRemove(Route route) { - // noop - } + @Override + public void notify(EventObject event) throws Exception { + // use event notifier to track events when Camel messages to endpoints + // these events corresponds to Zipkin client events - @Override - public void onStart(Route route) { - // noop - } + // client events + if (event instanceof ExchangeSendingEvent) { + ExchangeSendingEvent ese = (ExchangeSendingEvent) event; + String serviceName = getServiceName(ese.getExchange(), ese.getEndpoint(), false, true); + Brave brave = getBrave(serviceName); + if (brave != null) { + clientRequest(brave, serviceName, ese); + } + } else if (event instanceof ExchangeSentEvent) { + ExchangeSentEvent ese = (ExchangeSentEvent) event; + String serviceName = getServiceName(ese.getExchange(), ese.getEndpoint(), false, true); + Brave brave = getBrave(serviceName); + if (brave != null) { + clientResponse(brave, serviceName, ese); + } + } + } - @Override - public void onStop(Route route) { - // noop - } + @Override + public boolean isEnabled(EventObject event) { + return event instanceof ExchangeSendingEvent + || event instanceof ExchangeSentEvent + || event instanceof ExchangeCreatedEvent + || event instanceof ExchangeCompletedEvent + || event instanceof ExchangeFailedEvent; + } - @Override - public void onSuspend(Route route) { - // noop + @Override + public String toString() { + return "ZipkinEventNotifier"; + } } - @Override - public void onResume(Route route) { - // noop - } + private final class ZipkinRoutePolicy extends RoutePolicySupport { - @Override - public void onExchangeBegin(Route route, Exchange exchange) { - // use route policy to track events when Camel a Camel route begins/end the lifecycle of an Exchange - // these events corresponds to Zipkin server events - - if (hasZipkinTraceId(exchange)) { - String serviceName = getServiceName(exchange, route.getEndpoint(), true, false); - Brave brave = getBrave(serviceName); - if (brave != null) { - serverRequest(brave, serviceName, exchange); - } + private final String routeId; + + public ZipkinRoutePolicy(String routeId) { + this.routeId = routeId; } + @Override + public void onExchangeBegin(Route route, Exchange exchange) { + // use route policy to track events when Camel a Camel route begins/end the lifecycle of an Exchange + // these events corresponds to Zipkin server events - // add on completion after the route is done, but before the consumer writes the response - // this allows us to track the zipkin event before returning the response which is the right time - exchange.addOnCompletion(new SynchronizationAdapter() { - @Override - public void onAfterRoute(Route route, Exchange exchange) { + if (hasZipkinTraceId(exchange)) { String serviceName = getServiceName(exchange, route.getEndpoint(), true, false); Brave brave = getBrave(serviceName); if (brave != null) { - serverResponse(brave, serviceName, exchange); + serverRequest(brave, serviceName, exchange); } } - @Override - public String toString() { - return "ZipkinTracerOnCompletion"; - } - }); - } + // add on completion after the route is done, but before the consumer writes the response + // this allows us to track the zipkin event before returning the response which is the right time + exchange.addOnCompletion(new SynchronizationAdapter() { + @Override + public void onAfterRoute(Route route, Exchange exchange) { + String serviceName = getServiceName(exchange, route.getEndpoint(), true, false); + Brave brave = getBrave(serviceName); + if (brave != null) { + serverResponse(brave, serviceName, exchange); + } + } - @Override - public void onExchangeDone(Route route, Exchange exchange) { - // noop + @Override + public String toString() { + return "ZipkinTracerOnCompletion[" + routeId + "]"; + } + }); + } } - @Override - public RoutePolicy createRoutePolicy(CamelContext camelContext, String routeId, RouteDefinition route) { - return this; - } } http://git-wip-us.apache.org/repos/asf/camel/blob/08aea81c/components/camel-zipkin/src/test/java/org/apache/camel/zipkin/ManagedZipkinSimpleRouteTest.java ---------------------------------------------------------------------- diff --git a/components/camel-zipkin/src/test/java/org/apache/camel/zipkin/ManagedZipkinSimpleRouteTest.java b/components/camel-zipkin/src/test/java/org/apache/camel/zipkin/ManagedZipkinSimpleRouteTest.java new file mode 100644 index 0000000..356889a --- /dev/null +++ b/components/camel-zipkin/src/test/java/org/apache/camel/zipkin/ManagedZipkinSimpleRouteTest.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.zipkin; + +import java.util.concurrent.TimeUnit; + +import javax.management.MBeanServer; +import javax.management.ObjectName; + +import org.apache.camel.CamelContext; +import org.apache.camel.RoutesBuilder; +import org.apache.camel.builder.NotifyBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +public class ManagedZipkinSimpleRouteTest extends CamelTestSupport { + + private ZipkinTracer zipkin; + + @Override + protected boolean useJmx() { + return true; + } + + protected MBeanServer getMBeanServer() { + return context.getManagementStrategy().getManagementAgent().getMBeanServer(); + } + + @Override + protected CamelContext createCamelContext() throws Exception { + CamelContext context = super.createCamelContext(); + + zipkin = new ZipkinTracer(); + zipkin.setServiceName("dude"); + zipkin.setSpanCollector(new ZipkinLoggingSpanCollector()); + + // attaching ourself to CamelContext + zipkin.init(context); + + return context; + } + + @Test + public void testZipkinRoute() throws Exception { + // JMX tests dont work well on AIX CI servers (hangs them) + if (isPlatform("aix")) { + return; + } + + MBeanServer mbeanServer = getMBeanServer(); + ObjectName on = new ObjectName("org.apache.camel:context=camel-1,type=services,name=ZipkinTracer"); + assertNotNull(on); + assertTrue(mbeanServer.isRegistered(on)); + + Float rate = (Float) mbeanServer.getAttribute(on, "Rate"); + assertEquals("Should be 1.0f", 1.0f, rate.floatValue(), 0.1f); + + NotifyBuilder notify = new NotifyBuilder(context).whenDone(5).create(); + + for (int i = 0; i < 5; i++) { + template.sendBody("seda:dude", "Hello World"); + } + + assertTrue(notify.matches(30, TimeUnit.SECONDS)); + } + + @Override + protected RoutesBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("seda:dude").routeId("dude") + .log("routing at ${routeId}") + .delay(simple("${random(10,20)}")); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/08aea81c/components/camel-zipkin/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/components/camel-zipkin/src/test/resources/log4j.properties b/components/camel-zipkin/src/test/resources/log4j.properties index ab7b29f..e675899 100644 --- a/components/camel-zipkin/src/test/resources/log4j.properties +++ b/components/camel-zipkin/src/test/resources/log4j.properties @@ -21,6 +21,7 @@ log4j.rootLogger=INFO, file #log4j.logger.org.apache.camel=DEBUG +#log4j.logger.org.apache.camel.management=DEBUG log4j.logger.org.apache.camel.zipkin=DEBUG #log4j.logger.com.github.kristofa.brave=DEBUG