Repository: camel Updated Branches: refs/heads/master f2cc3de50 -> a8910d540
Support for async response handling and multicast/parallel. Included local implementation of SpanManager to add support for transferring managed span stack between threads. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a8910d54 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a8910d54 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a8910d54 Branch: refs/heads/master Commit: a8910d5400e33a4a832e878932c7d14d43021cff Parents: f2cc3de Author: Gary Brown <g...@brownuk.com> Authored: Mon Mar 20 17:14:11 2017 +0000 Committer: Claus Ibsen <davscl...@apache.org> Committed: Wed Mar 22 13:29:54 2017 +0100 ---------------------------------------------------------------------- .../camel/opentracing/OpenTracingTracer.java | 50 +++-- .../concurrent/CamelSpanManager.java | 207 +++++++++++++++++++ .../OpenTracingExecutorServiceManager.java | 200 ++++++++++++++++++ .../CamelOpenTracingTestSupport.java | 75 ++++--- .../opentracing/MulticastParallelRouteTest.java | 80 +++++++ 5 files changed, 574 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/a8910d54/components/camel-opentracing/src/main/java/org/apache/camel/opentracing/OpenTracingTracer.java ---------------------------------------------------------------------- diff --git a/components/camel-opentracing/src/main/java/org/apache/camel/opentracing/OpenTracingTracer.java b/components/camel-opentracing/src/main/java/org/apache/camel/opentracing/OpenTracingTracer.java index e662d24..d2f3b28 100644 --- a/components/camel-opentracing/src/main/java/org/apache/camel/opentracing/OpenTracingTracer.java +++ b/components/camel-opentracing/src/main/java/org/apache/camel/opentracing/OpenTracingTracer.java @@ -27,7 +27,6 @@ import io.opentracing.Span; import io.opentracing.Tracer; import io.opentracing.Tracer.SpanBuilder; import io.opentracing.contrib.global.GlobalTracer; -import io.opentracing.contrib.spanmanager.DefaultSpanManager; import io.opentracing.contrib.spanmanager.SpanManager; import io.opentracing.propagation.Format; import io.opentracing.tag.Tags; @@ -41,6 +40,8 @@ import org.apache.camel.api.management.ManagedResource; import org.apache.camel.management.event.ExchangeSendingEvent; import org.apache.camel.management.event.ExchangeSentEvent; import org.apache.camel.model.RouteDefinition; +import org.apache.camel.opentracing.concurrent.CamelSpanManager; +import org.apache.camel.opentracing.concurrent.OpenTracingExecutorServiceManager; import org.apache.camel.opentracing.propagation.CamelHeadersExtractAdapter; import org.apache.camel.opentracing.propagation.CamelHeadersInjectAdapter; import org.apache.camel.spi.RoutePolicy; @@ -65,10 +66,12 @@ public class OpenTracingTracer extends ServiceSupport implements RoutePolicyFact private static final Logger LOG = LoggerFactory.getLogger(OpenTracingTracer.class); + private static final String MANAGED_SPAN_PROPERTY = "ManagedSpan"; + private static Map<String, SpanDecorator> decorators = new HashMap<>(); private final OpenTracingEventNotifier eventNotifier = new OpenTracingEventNotifier(); - private final SpanManager spanManager = DefaultSpanManager.getInstance(); + private final CamelSpanManager spanManager = CamelSpanManager.getInstance(); private Tracer tracer; private CamelContext camelContext; @@ -103,6 +106,10 @@ public class OpenTracingTracer extends ServiceSupport implements RoutePolicyFact try { // start this service eager so we init before Camel is starting up camelContext.addService(this, true, true); + + // Wrap the ExecutorServiceManager with a SpanManager aware version + camelContext.setExecutorServiceManager( + new OpenTracingExecutorServiceManager(camelContext.getExecutorServiceManager(), spanManager)); } catch (Exception e) { throw ObjectHelper.wrapRuntimeCamelException(e); } @@ -187,18 +194,26 @@ public class OpenTracingTracer extends ServiceSupport implements RoutePolicyFact sd.pre(span, ese.getExchange(), ese.getEndpoint()); tracer.inject(span.context(), Format.Builtin.TEXT_MAP, new CamelHeadersInjectAdapter(ese.getExchange().getIn().getHeaders())); - spanManager.activate(span); + ese.getExchange().setProperty(MANAGED_SPAN_PROPERTY, spanManager.activate(span)); + spanManager.clear(); if (LOG.isTraceEnabled()) { LOG.trace("OpenTracing: start client span=" + span); } } else if (event instanceof ExchangeSentEvent) { - SpanManager.ManagedSpan managedSpan = spanManager.current(); + ExchangeSentEvent ese = (ExchangeSentEvent) event; + SpanManager.ManagedSpan managedSpan = (SpanManager.ManagedSpan) + ese.getExchange().getProperty(MANAGED_SPAN_PROPERTY); + if (managedSpan != null) { + spanManager.activate(managedSpan); + ese.getExchange().setProperty(MANAGED_SPAN_PROPERTY, null); + } else { + managedSpan = spanManager.current(); + } if (LOG.isTraceEnabled()) { LOG.trace("OpenTracing: start client span=" + managedSpan.getSpan()); } - SpanDecorator sd = getSpanDecorator(((ExchangeSentEvent) event).getEndpoint()); - sd.post(managedSpan.getSpan(), ((ExchangeSentEvent) event).getExchange(), - ((ExchangeSentEvent) event).getEndpoint()); + SpanDecorator sd = getSpanDecorator(ese.getEndpoint()); + sd.post(managedSpan.getSpan(), ese.getExchange(), ese.getEndpoint()); managedSpan.getSpan().finish(); managedSpan.deactivate(); } @@ -223,6 +238,11 @@ public class OpenTracingTracer extends ServiceSupport implements RoutePolicyFact @Override public void onExchangeBegin(Route route, Exchange exchange) { + // Check if continuing exchange on same thread + if (exchange.getProperties().containsKey(MANAGED_SPAN_PROPERTY)) { + spanManager.activate((SpanManager.ManagedSpan)exchange.getProperty(MANAGED_SPAN_PROPERTY)); + exchange.setProperty(MANAGED_SPAN_PROPERTY, null); + } SpanDecorator sd = getSpanDecorator(route.getEndpoint()); Span span = tracer.buildSpan(sd.getOperationName(exchange, route.getEndpoint())) .asChildOf(tracer.extract(Format.Builtin.TEXT_MAP, @@ -239,13 +259,17 @@ public class OpenTracingTracer extends ServiceSupport implements RoutePolicyFact @Override public void onExchangeDone(Route route, Exchange exchange) { SpanManager.ManagedSpan managedSpan = spanManager.current(); - if (LOG.isTraceEnabled()) { - LOG.trace("OpenTracing: finish server span=" + managedSpan.getSpan()); + if (managedSpan.getSpan() != null) { + if (LOG.isTraceEnabled()) { + LOG.trace("OpenTracing: finish server span=" + managedSpan.getSpan()); + } + SpanDecorator sd = getSpanDecorator(route.getEndpoint()); + sd.post(managedSpan.getSpan(), exchange, route.getEndpoint()); + managedSpan.getSpan().finish(); + managedSpan.deactivate(); + } else { + LOG.warn("OpenTracing: could not find managed span for exchange=" + exchange); } - SpanDecorator sd = getSpanDecorator(route.getEndpoint()); - sd.post(managedSpan.getSpan(), exchange, route.getEndpoint()); - managedSpan.getSpan().finish(); - managedSpan.deactivate(); } } http://git-wip-us.apache.org/repos/asf/camel/blob/a8910d54/components/camel-opentracing/src/main/java/org/apache/camel/opentracing/concurrent/CamelSpanManager.java ---------------------------------------------------------------------- diff --git a/components/camel-opentracing/src/main/java/org/apache/camel/opentracing/concurrent/CamelSpanManager.java b/components/camel-opentracing/src/main/java/org/apache/camel/opentracing/concurrent/CamelSpanManager.java new file mode 100644 index 0000000..c5c34af --- /dev/null +++ b/components/camel-opentracing/src/main/java/org/apache/camel/opentracing/concurrent/CamelSpanManager.java @@ -0,0 +1,207 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.opentracing.concurrent; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Level; +import java.util.logging.Logger; + +import io.opentracing.NoopSpan; +import io.opentracing.Span; +import io.opentracing.contrib.spanmanager.SpanManager; + +/** + * Camel {@link SpanManager} implementation using {@link ThreadLocal} storage + * maintaining a stack-like structure of linked managed spans. + * <p> + * The linked managed spans provide the following stack unwinding algorithm: + * <ol> + * <li>If the deactivated span is not the <em>managed</em> span, the <em>current managed</em> span is left alone.</li> + * <li>Otherwise, the first parent that is <em>not yet deactivated</em> is set as the new managed span.</li> + * <li>If no managed parents remain, the <em>managed span</em> is cleared.</li> + * <li>Consecutive <code>deactivate()</code> calls for already-deactivated spans will be ignored.</li> + * </ol> + * <p> + * NOTE: This implementation has been copied and extended from opentracing-contrib/java-spanmanager + * project, to provide the additional functionality for transferring the managed spans from one thread + * to another. This functionality will soon be provided by the core opentracing-java project, at which + * time this implementation will be removed. + */ +public final class CamelSpanManager implements SpanManager { + + private static final Logger LOGGER = Logger.getLogger(CamelSpanManager.class.getName()); + private static final CamelSpanManager INSTANCE = new CamelSpanManager(); + private static final ManagedSpan NO_MANAGED_SPAN = new NoManagedSpan(); + + private final ThreadLocal<LinkedManagedSpan> managed = new ThreadLocal<LinkedManagedSpan>(); + + private CamelSpanManager() { + } + + /** + * @return The singleton instance of the camel span manager. + */ + public static CamelSpanManager getInstance() { + return INSTANCE; + } + + /** + * Stack unwinding algorithm that refreshes the currently managed span. + * <p> + * See {@link CamelSpanManager class javadoc} for a full description. + * + * @return The current non-deactivated LinkedManagedSpan or <code>null</code> if none remained. + */ + private LinkedManagedSpan refreshCurrent() { + LinkedManagedSpan managedSpan = managed.get(); + LinkedManagedSpan current = managedSpan; + while (current != null && current.deactivated.get()) { // Unwind stack if necessary. + current = current.parent; + } + if (current != managedSpan) { // refresh current if necessary. + if (current == null) { + managed.remove(); + } else { + managed.set(current); + } + } + return current; + } + + @Override + public ManagedSpan activate(Span span) { + LinkedManagedSpan managedSpan = new LinkedManagedSpan(span, refreshCurrent()); + managed.set(managedSpan); + return managedSpan; + } + + /** + * This method associates the supplied managed span with the current + * execution context (thread). + * + * @param managedSpan The managed span + */ + public void activate(ManagedSpan managedSpan) { + if (managedSpan instanceof LinkedManagedSpan) { + managed.set((LinkedManagedSpan)managedSpan); + } + } + + @Override + public ManagedSpan current() { + LinkedManagedSpan current = refreshCurrent(); + return current != null ? current : NO_MANAGED_SPAN; + } + + @Override + public void clear() { + managed.remove(); + } + + @Override + @Deprecated + public Span currentSpan() { + ManagedSpan current = current(); + return current.getSpan() != null ? current.getSpan() : NoopSpan.INSTANCE; + } + + /** + * @see #activate(Span) + * @deprecated renamed to activate() + */ + @Deprecated + public ManagedSpan manage(Span span) { + return activate(span); + } + + @Override + public String toString() { + return getClass().getSimpleName(); + } + + private final class LinkedManagedSpan implements ManagedSpan { + private final LinkedManagedSpan parent; + private final Span span; + private final AtomicBoolean deactivated = new AtomicBoolean(false); + + private LinkedManagedSpan(Span span, LinkedManagedSpan parent) { + this.parent = parent; + this.span = span; + } + + @Override + public Span getSpan() { + return span; + } + + public void deactivate() { + if (deactivated.compareAndSet(false, true)) { + LinkedManagedSpan current = refreshCurrent(); // Trigger stack-unwinding algorithm. + LOGGER.log(Level.FINER, "Released {0}, current span is {1}.", new Object[]{this, current}); + } else { + LOGGER.log(Level.FINEST, "No action needed, {0} was already deactivated.", this); + } + } + + @Override + public void close() { + deactivate(); + } + + /** + * @see #deactivate() + * @deprecated renamed to deactivate() + */ + @Deprecated + public void release() { + deactivate(); + } + + @Override + public String toString() { + return getClass().getSimpleName() + '{' + span + '}'; + } + } + + /** + * Empty implementation signifying there is no managed span. + */ + private static final class NoManagedSpan implements ManagedSpan { + private NoManagedSpan() { + } + + @Override + public Span getSpan() { + return null; + } + + @Override + public void deactivate() { + // no-op + } + + @Override + public void close() { + // no-op + } + + @Override + public String toString() { + return getClass().getSimpleName(); + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/a8910d54/components/camel-opentracing/src/main/java/org/apache/camel/opentracing/concurrent/OpenTracingExecutorServiceManager.java ---------------------------------------------------------------------- diff --git a/components/camel-opentracing/src/main/java/org/apache/camel/opentracing/concurrent/OpenTracingExecutorServiceManager.java b/components/camel-opentracing/src/main/java/org/apache/camel/opentracing/concurrent/OpenTracingExecutorServiceManager.java new file mode 100644 index 0000000..2b19a88 --- /dev/null +++ b/components/camel-opentracing/src/main/java/org/apache/camel/opentracing/concurrent/OpenTracingExecutorServiceManager.java @@ -0,0 +1,200 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.opentracing.concurrent; + +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; + +import io.opentracing.contrib.spanmanager.SpanManager; +import io.opentracing.contrib.spanmanager.concurrent.SpanPropagatingExecutorService; + +import org.apache.camel.spi.ExecutorServiceManager; +import org.apache.camel.spi.ThreadPoolFactory; +import org.apache.camel.spi.ThreadPoolProfile; + +public class OpenTracingExecutorServiceManager implements ExecutorServiceManager { + + private final ExecutorServiceManager delegate; + private final SpanManager spanManager; + + public OpenTracingExecutorServiceManager(ExecutorServiceManager delegate, SpanManager spanManager) { + this.delegate = delegate; + this.spanManager = spanManager; + } + + @Override + public void shutdown() throws Exception { + delegate.shutdown(); + } + + @Override + public void start() throws Exception { + delegate.start(); + } + + @Override + public void stop() throws Exception { + delegate.stop(); + } + + @Override + public boolean awaitTermination(ExecutorService arg0, long arg1) throws InterruptedException { + return delegate.awaitTermination(arg0, arg1); + } + + @Override + public ThreadPoolProfile getDefaultThreadPoolProfile() { + return delegate.getDefaultThreadPoolProfile(); + } + + @Override + public long getShutdownAwaitTermination() { + return delegate.getShutdownAwaitTermination(); + } + + @Override + public String getThreadNamePattern() { + return delegate.getThreadNamePattern(); + } + + @Override + public ThreadPoolFactory getThreadPoolFactory() { + return delegate.getThreadPoolFactory(); + } + + @Override + public ThreadPoolProfile getThreadPoolProfile(String arg0) { + return delegate.getThreadPoolProfile(arg0); + } + + @Override + public ExecutorService newCachedThreadPool(Object arg0, String arg1) { + return new SpanPropagatingExecutorService(delegate.newCachedThreadPool(arg0, arg1), spanManager); + } + + @Override + public ScheduledExecutorService newDefaultScheduledThreadPool(Object arg0, String arg1) { + return delegate.newDefaultScheduledThreadPool(arg0, arg1); + } + + @Override + public ExecutorService newDefaultThreadPool(Object arg0, String arg1) { + return new SpanPropagatingExecutorService(delegate.newDefaultThreadPool(arg0, arg1), spanManager); + } + + @Override + public ExecutorService newFixedThreadPool(Object arg0, String arg1, int arg2) { + return new SpanPropagatingExecutorService(delegate.newFixedThreadPool(arg0, arg1, arg2), spanManager); + } + + @Override + public ScheduledExecutorService newScheduledThreadPool(Object arg0, String arg1, int arg2) { + return delegate.newScheduledThreadPool(arg0, arg1, arg2); + } + + @Override + public ScheduledExecutorService newScheduledThreadPool(Object arg0, String arg1, ThreadPoolProfile arg2) { + return delegate.newScheduledThreadPool(arg0, arg1, arg2); + } + + @Override + public ScheduledExecutorService newScheduledThreadPool(Object arg0, String arg1, String arg2) { + return delegate.newScheduledThreadPool(arg0, arg1, arg2); + } + + @Override + public ExecutorService newSingleThreadExecutor(Object arg0, String arg1) { + return new SpanPropagatingExecutorService(delegate.newSingleThreadExecutor(arg0, arg1), spanManager); + } + + @Override + public ScheduledExecutorService newSingleThreadScheduledExecutor(Object arg0, String arg1) { + return delegate.newSingleThreadScheduledExecutor(arg0, arg1); + } + + @Override + public Thread newThread(String arg0, Runnable arg1) { + return delegate.newThread(arg0, arg1); + } + + @Override + public ExecutorService newThreadPool(Object arg0, String arg1, ThreadPoolProfile arg2) { + return new SpanPropagatingExecutorService(delegate.newThreadPool(arg0, arg1, arg2), spanManager); + } + + @Override + public ExecutorService newThreadPool(Object arg0, String arg1, String arg2) { + return new SpanPropagatingExecutorService(delegate.newThreadPool(arg0, arg1, arg2), spanManager); + } + + @Override + public ExecutorService newThreadPool(Object arg0, String arg1, int arg2, int arg3) { + return new SpanPropagatingExecutorService(delegate.newThreadPool(arg0, arg1, arg2, arg3), spanManager); + } + + @Override + public void registerThreadPoolProfile(ThreadPoolProfile arg0) { + delegate.registerThreadPoolProfile(arg0); + } + + @Override + public String resolveThreadName(String arg0) { + return delegate.resolveThreadName(arg0); + } + + @Override + public void setDefaultThreadPoolProfile(ThreadPoolProfile arg0) { + delegate.setDefaultThreadPoolProfile(arg0); + } + + @Override + public void setShutdownAwaitTermination(long arg0) { + delegate.setShutdownAwaitTermination(arg0); + } + + @Override + public void setThreadNamePattern(String arg0) throws IllegalArgumentException { + delegate.setThreadNamePattern(arg0); + } + + @Override + public void setThreadPoolFactory(ThreadPoolFactory arg0) { + delegate.setThreadPoolFactory(arg0); + } + + @Override + public void shutdown(ExecutorService arg0) { + delegate.shutdown(arg0); + } + + @Override + public void shutdownGraceful(ExecutorService arg0) { + delegate.shutdownGraceful(arg0); + } + + @Override + public void shutdownGraceful(ExecutorService arg0, long arg1) { + delegate.shutdownGraceful(arg0, arg1); + } + + @Override + public List<Runnable> shutdownNow(ExecutorService arg0) { + return delegate.shutdownNow(arg0); + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/a8910d54/components/camel-opentracing/src/test/java/org/apache/camel/opentracing/CamelOpenTracingTestSupport.java ---------------------------------------------------------------------- diff --git a/components/camel-opentracing/src/test/java/org/apache/camel/opentracing/CamelOpenTracingTestSupport.java b/components/camel-opentracing/src/test/java/org/apache/camel/opentracing/CamelOpenTracingTestSupport.java index 6c7ad98..a48cee0 100644 --- a/components/camel-opentracing/src/test/java/org/apache/camel/opentracing/CamelOpenTracingTestSupport.java +++ b/components/camel-opentracing/src/test/java/org/apache/camel/opentracing/CamelOpenTracingTestSupport.java @@ -18,11 +18,15 @@ package org.apache.camel.opentracing; import java.net.URI; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; import io.opentracing.Span; +import io.opentracing.mock.MockSpan; import io.opentracing.mock.MockTracer; import io.opentracing.mock.MockTracer.Propagator; import io.opentracing.tag.Tags; @@ -63,42 +67,63 @@ public class CamelOpenTracingTestSupport extends CamelTestSupport { } protected void verify() { + verify(false); + } + + protected void verify(boolean async) { assertEquals("Incorrect number of spans", testdata.length, tracer.finishedSpans().size()); + verifySameTrace(); + + List<MockSpan> spans = tracer.finishedSpans(); + if (async) { + final List<MockSpan> unsortedSpans = spans; + spans = Arrays.asList(testdata).stream() + .map(td -> findSpan(td, unsortedSpans)).distinct().collect(Collectors.toList()); + assertEquals("Incorrect number of spans after sorting", testdata.length, spans.size()); + } + for (int i = 0; i < testdata.length; i++) { - if (i > 0) { - assertEquals(testdata[i].getLabel(), tracer.finishedSpans().get(0).context().traceId(), - tracer.finishedSpans().get(i).context().traceId()); - } + verifySpan(i, testdata, spans); + } + } - String component = (String) tracer.finishedSpans().get(i).tags().get(Tags.COMPONENT.getKey()); - assertNotNull(component); - assertEquals(testdata[i].getLabel(), - SpanDecorator.CAMEL_COMPONENT + URI.create((String) testdata[i].getUri()).getScheme(), - component); - assertEquals(testdata[i].getLabel(), testdata[i].getUri(), - tracer.finishedSpans().get(i).tags().get("camel.uri")); - - // If span associated with TestSEDASpanDecorator, check that pre/post tags have been defined - if ("camel-seda".equals(component)) { - assertTrue(tracer.finishedSpans().get(i).tags().containsKey("pre")); - assertTrue(tracer.finishedSpans().get(i).tags().containsKey("post")); - } + protected MockSpan findSpan(SpanTestData testdata, List<MockSpan> spans) { + return spans.stream().filter(s -> s.operationName().equals(testdata.getOperation()) + && s.tags().get("camel.uri").equals(testdata.getUri()) + && s.tags().get(Tags.SPAN_KIND.getKey()).equals(testdata.getKind())).findFirst().orElse(null); + } - assertEquals(testdata[i].getLabel(), testdata[i].getOperation(), tracer.finishedSpans().get(i).operationName()); + protected void verifySpan(int index, SpanTestData[] testdata, List<MockSpan> spans) { + String component = (String) spans.get(index).tags().get(Tags.COMPONENT.getKey()); + assertNotNull(component); + assertEquals(testdata[index].getLabel(), + SpanDecorator.CAMEL_COMPONENT + URI.create((String) testdata[index].getUri()).getScheme(), + component); + assertEquals(testdata[index].getLabel(), testdata[index].getUri(), spans.get(index).tags().get("camel.uri")); + + // If span associated with TestSEDASpanDecorator, check that pre/post tags have been defined + if ("camel-seda".equals(component)) { + assertTrue(spans.get(index).tags().containsKey("pre")); + assertTrue(spans.get(index).tags().containsKey("post")); + } - assertEquals(testdata[i].getLabel(), testdata[i].getKind(), - tracer.finishedSpans().get(i).tags().get(Tags.SPAN_KIND.getKey())); + assertEquals(testdata[index].getLabel(), testdata[index].getOperation(), spans.get(index).operationName()); - if (testdata[i].getParentId() != -1) { - assertEquals(testdata[i].getLabel(), - tracer.finishedSpans().get(testdata[i].getParentId()).context().spanId(), - tracer.finishedSpans().get(i).parentId()); - } + assertEquals(testdata[index].getLabel(), testdata[index].getKind(), + spans.get(index).tags().get(Tags.SPAN_KIND.getKey())); + if (testdata[index].getParentId() != -1) { + assertEquals(testdata[index].getLabel(), + spans.get(testdata[index].getParentId()).context().spanId(), + spans.get(index).parentId()); } } + protected void verifySameTrace() { + assertEquals(1, tracer.finishedSpans().stream().map(s -> s.context().traceId()).distinct().count()); + } + protected void verifyTraceSpanNumbers(int numOfTraces, int numSpansPerTrace) { Map<Long, List<Span>> traces = new HashMap<Long, List<Span>>(); http://git-wip-us.apache.org/repos/asf/camel/blob/a8910d54/components/camel-opentracing/src/test/java/org/apache/camel/opentracing/MulticastParallelRouteTest.java ---------------------------------------------------------------------- diff --git a/components/camel-opentracing/src/test/java/org/apache/camel/opentracing/MulticastParallelRouteTest.java b/components/camel-opentracing/src/test/java/org/apache/camel/opentracing/MulticastParallelRouteTest.java new file mode 100644 index 0000000..856f931 --- /dev/null +++ b/components/camel-opentracing/src/test/java/org/apache/camel/opentracing/MulticastParallelRouteTest.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.opentracing; + +import io.opentracing.tag.Tags; +import org.apache.camel.RoutesBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.junit.Test; + +public class MulticastParallelRouteTest extends CamelOpenTracingTestSupport { + + private static SpanTestData[] testdata = { + new SpanTestData().setLabel("seda:b server").setUri("seda://b").setOperation("b") + .setKind(Tags.SPAN_KIND_SERVER).setParentId(1), + new SpanTestData().setLabel("seda:b client").setUri("seda://b").setOperation("b") + .setKind(Tags.SPAN_KIND_CLIENT).setParentId(4), + new SpanTestData().setLabel("seda:c server").setUri("seda://c").setOperation("c") + .setKind(Tags.SPAN_KIND_SERVER).setParentId(3), + new SpanTestData().setLabel("seda:c client").setUri("seda://c").setOperation("c") + .setKind(Tags.SPAN_KIND_CLIENT).setParentId(4), + new SpanTestData().setLabel("seda:a server").setUri("seda://a").setOperation("a") + .setKind(Tags.SPAN_KIND_SERVER).setParentId(5), + new SpanTestData().setLabel("seda:a client").setUri("seda://a").setOperation("a") + .setKind(Tags.SPAN_KIND_CLIENT).setParentId(6), + new SpanTestData().setLabel("direct:start server").setUri("direct://start").setOperation("start") + .setKind(Tags.SPAN_KIND_SERVER).setParentId(7), + new SpanTestData().setLabel("direct:start client").setUri("direct://start").setOperation("start") + .setKind(Tags.SPAN_KIND_CLIENT) + }; + + public MulticastParallelRouteTest() { + super(testdata); + } + + @Test + public void testRoute() throws Exception { + template.requestBody("direct:start", "Hello"); + + verify(true); + } + + @Override + protected RoutesBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start").to("seda:a").routeId("start"); + + from("seda:a").routeId("a") + .log("routing at ${routeId}") + .multicast().parallelProcessing() + .to("seda:b", "seda:c") + .end() + .log("End of routing"); + + from("seda:b").routeId("b") + .log("routing at ${routeId}") + .delay(simple("${random(1000,2000)}")); + + from("seda:c").routeId("c") + .log("routing at ${routeId}") + .delay(simple("${random(0,100)}")); + } + }; + } +}