Repository: camel
Updated Branches:
  refs/heads/master f2cc3de50 -> a8910d540


Support for async response handling and multicast/parallel. Included local 
implementation of SpanManager to add support for transferring managed span 
stack between threads.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a8910d54
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a8910d54
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a8910d54

Branch: refs/heads/master
Commit: a8910d5400e33a4a832e878932c7d14d43021cff
Parents: f2cc3de
Author: Gary Brown <g...@brownuk.com>
Authored: Mon Mar 20 17:14:11 2017 +0000
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Wed Mar 22 13:29:54 2017 +0100

----------------------------------------------------------------------
 .../camel/opentracing/OpenTracingTracer.java    |  50 +++--
 .../concurrent/CamelSpanManager.java            | 207 +++++++++++++++++++
 .../OpenTracingExecutorServiceManager.java      | 200 ++++++++++++++++++
 .../CamelOpenTracingTestSupport.java            |  75 ++++---
 .../opentracing/MulticastParallelRouteTest.java |  80 +++++++
 5 files changed, 574 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/a8910d54/components/camel-opentracing/src/main/java/org/apache/camel/opentracing/OpenTracingTracer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-opentracing/src/main/java/org/apache/camel/opentracing/OpenTracingTracer.java
 
b/components/camel-opentracing/src/main/java/org/apache/camel/opentracing/OpenTracingTracer.java
index e662d24..d2f3b28 100644
--- 
a/components/camel-opentracing/src/main/java/org/apache/camel/opentracing/OpenTracingTracer.java
+++ 
b/components/camel-opentracing/src/main/java/org/apache/camel/opentracing/OpenTracingTracer.java
@@ -27,7 +27,6 @@ import io.opentracing.Span;
 import io.opentracing.Tracer;
 import io.opentracing.Tracer.SpanBuilder;
 import io.opentracing.contrib.global.GlobalTracer;
-import io.opentracing.contrib.spanmanager.DefaultSpanManager;
 import io.opentracing.contrib.spanmanager.SpanManager;
 import io.opentracing.propagation.Format;
 import io.opentracing.tag.Tags;
@@ -41,6 +40,8 @@ import org.apache.camel.api.management.ManagedResource;
 import org.apache.camel.management.event.ExchangeSendingEvent;
 import org.apache.camel.management.event.ExchangeSentEvent;
 import org.apache.camel.model.RouteDefinition;
+import org.apache.camel.opentracing.concurrent.CamelSpanManager;
+import 
org.apache.camel.opentracing.concurrent.OpenTracingExecutorServiceManager;
 import org.apache.camel.opentracing.propagation.CamelHeadersExtractAdapter;
 import org.apache.camel.opentracing.propagation.CamelHeadersInjectAdapter;
 import org.apache.camel.spi.RoutePolicy;
@@ -65,10 +66,12 @@ public class OpenTracingTracer extends ServiceSupport 
implements RoutePolicyFact
 
     private static final Logger LOG = 
LoggerFactory.getLogger(OpenTracingTracer.class);
 
+    private static final String MANAGED_SPAN_PROPERTY = "ManagedSpan";
+
     private static Map<String, SpanDecorator> decorators = new HashMap<>();
 
     private final OpenTracingEventNotifier eventNotifier = new 
OpenTracingEventNotifier();
-    private final SpanManager spanManager = DefaultSpanManager.getInstance();
+    private final CamelSpanManager spanManager = 
CamelSpanManager.getInstance();
     private Tracer tracer;
     private CamelContext camelContext;
 
@@ -103,6 +106,10 @@ public class OpenTracingTracer extends ServiceSupport 
implements RoutePolicyFact
             try {
                 // start this service eager so we init before Camel is 
starting up
                 camelContext.addService(this, true, true);
+
+                // Wrap the ExecutorServiceManager with a SpanManager aware 
version
+                camelContext.setExecutorServiceManager(
+                        new 
OpenTracingExecutorServiceManager(camelContext.getExecutorServiceManager(), 
spanManager));
             } catch (Exception e) {
                 throw ObjectHelper.wrapRuntimeCamelException(e);
             }
@@ -187,18 +194,26 @@ public class OpenTracingTracer extends ServiceSupport 
implements RoutePolicyFact
                 sd.pre(span, ese.getExchange(), ese.getEndpoint());
                 tracer.inject(span.context(), Format.Builtin.TEXT_MAP,
                     new 
CamelHeadersInjectAdapter(ese.getExchange().getIn().getHeaders()));
-                spanManager.activate(span);
+                ese.getExchange().setProperty(MANAGED_SPAN_PROPERTY, 
spanManager.activate(span));
+                spanManager.clear();
                 if (LOG.isTraceEnabled()) {
                     LOG.trace("OpenTracing: start client span=" + span);
                 }
             } else if (event instanceof ExchangeSentEvent) {
-                SpanManager.ManagedSpan managedSpan = spanManager.current();
+                ExchangeSentEvent ese = (ExchangeSentEvent) event;
+                SpanManager.ManagedSpan managedSpan = (SpanManager.ManagedSpan)
+                        ese.getExchange().getProperty(MANAGED_SPAN_PROPERTY);
+                if (managedSpan != null) {
+                    spanManager.activate(managedSpan);
+                    ese.getExchange().setProperty(MANAGED_SPAN_PROPERTY, null);
+                } else {
+                    managedSpan = spanManager.current();
+                }
                 if (LOG.isTraceEnabled()) {
                     LOG.trace("OpenTracing: start client span=" + 
managedSpan.getSpan());
                 }
-                SpanDecorator sd = getSpanDecorator(((ExchangeSentEvent) 
event).getEndpoint());
-                sd.post(managedSpan.getSpan(), ((ExchangeSentEvent) 
event).getExchange(),
-                    ((ExchangeSentEvent) event).getEndpoint());
+                SpanDecorator sd = getSpanDecorator(ese.getEndpoint());
+                sd.post(managedSpan.getSpan(), ese.getExchange(), 
ese.getEndpoint());
                 managedSpan.getSpan().finish();
                 managedSpan.deactivate();
             }
@@ -223,6 +238,11 @@ public class OpenTracingTracer extends ServiceSupport 
implements RoutePolicyFact
 
         @Override
         public void onExchangeBegin(Route route, Exchange exchange) {
+            // Check if continuing exchange on same thread
+            if (exchange.getProperties().containsKey(MANAGED_SPAN_PROPERTY)) {
+                
spanManager.activate((SpanManager.ManagedSpan)exchange.getProperty(MANAGED_SPAN_PROPERTY));
+                exchange.setProperty(MANAGED_SPAN_PROPERTY, null);
+            }
             SpanDecorator sd = getSpanDecorator(route.getEndpoint());
             Span span = tracer.buildSpan(sd.getOperationName(exchange, 
route.getEndpoint()))
                 .asChildOf(tracer.extract(Format.Builtin.TEXT_MAP,
@@ -239,13 +259,17 @@ public class OpenTracingTracer extends ServiceSupport 
implements RoutePolicyFact
         @Override
         public void onExchangeDone(Route route, Exchange exchange) {
             SpanManager.ManagedSpan managedSpan = spanManager.current();
-            if (LOG.isTraceEnabled()) {
-                LOG.trace("OpenTracing: finish server span=" + 
managedSpan.getSpan());
+            if (managedSpan.getSpan() != null) {
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("OpenTracing: finish server span=" + 
managedSpan.getSpan());
+                }
+                SpanDecorator sd = getSpanDecorator(route.getEndpoint());
+                sd.post(managedSpan.getSpan(), exchange, route.getEndpoint());
+                managedSpan.getSpan().finish();
+                managedSpan.deactivate();
+            } else {
+                LOG.warn("OpenTracing: could not find managed span for 
exchange=" + exchange);
             }
-            SpanDecorator sd = getSpanDecorator(route.getEndpoint());
-            sd.post(managedSpan.getSpan(), exchange, route.getEndpoint());
-            managedSpan.getSpan().finish();
-            managedSpan.deactivate();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/a8910d54/components/camel-opentracing/src/main/java/org/apache/camel/opentracing/concurrent/CamelSpanManager.java
----------------------------------------------------------------------
diff --git 
a/components/camel-opentracing/src/main/java/org/apache/camel/opentracing/concurrent/CamelSpanManager.java
 
b/components/camel-opentracing/src/main/java/org/apache/camel/opentracing/concurrent/CamelSpanManager.java
new file mode 100644
index 0000000..c5c34af
--- /dev/null
+++ 
b/components/camel-opentracing/src/main/java/org/apache/camel/opentracing/concurrent/CamelSpanManager.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.opentracing.concurrent;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import io.opentracing.NoopSpan;
+import io.opentracing.Span;
+import io.opentracing.contrib.spanmanager.SpanManager;
+
+/**
+ * Camel {@link SpanManager} implementation using {@link ThreadLocal} storage
+ * maintaining a stack-like structure of linked managed spans.
+ * <p>
+ * The linked managed spans provide the following stack unwinding algorithm:
+ * <ol>
+ * <li>If the deactivated span is not the <em>managed</em> span, the 
<em>current managed</em> span is left alone.</li>
+ * <li>Otherwise, the first parent that is <em>not yet deactivated</em> is set 
as the new managed span.</li>
+ * <li>If no managed parents remain, the <em>managed span</em> is cleared.</li>
+ * <li>Consecutive <code>deactivate()</code> calls for already-deactivated 
spans will be ignored.</li>
+ * </ol>
+ * <p>
+ * NOTE: This implementation has been copied and extended from 
opentracing-contrib/java-spanmanager
+ * project, to provide the additional functionality for transferring the 
managed spans from one thread
+ * to another. This functionality will soon be provided by the core 
opentracing-java project, at which
+ * time this implementation will be removed.
+ */
+public final class CamelSpanManager implements SpanManager {
+
+    private static final Logger LOGGER = 
Logger.getLogger(CamelSpanManager.class.getName());
+    private static final CamelSpanManager INSTANCE = new CamelSpanManager();
+    private static final ManagedSpan NO_MANAGED_SPAN = new NoManagedSpan();
+
+    private final ThreadLocal<LinkedManagedSpan> managed = new 
ThreadLocal<LinkedManagedSpan>();
+
+    private CamelSpanManager() {
+    }
+
+    /**
+     * @return The singleton instance of the camel span manager.
+     */
+    public static CamelSpanManager getInstance() {
+        return INSTANCE;
+    }
+
+    /**
+     * Stack unwinding algorithm that refreshes the currently managed span.
+     * <p>
+     * See {@link CamelSpanManager class javadoc} for a full description.
+     *
+     * @return The current non-deactivated LinkedManagedSpan or 
<code>null</code> if none remained.
+     */
+    private LinkedManagedSpan refreshCurrent() {
+        LinkedManagedSpan managedSpan = managed.get();
+        LinkedManagedSpan current = managedSpan;
+        while (current != null && current.deactivated.get()) { // Unwind stack 
if necessary.
+            current = current.parent;
+        }
+        if (current != managedSpan) { // refresh current if necessary.
+            if (current == null) {
+                managed.remove();
+            } else {
+                managed.set(current);
+            }
+        }
+        return current;
+    }
+
+    @Override
+    public ManagedSpan activate(Span span) {
+        LinkedManagedSpan managedSpan = new LinkedManagedSpan(span, 
refreshCurrent());
+        managed.set(managedSpan);
+        return managedSpan;
+    }
+
+    /**
+     * This method associates the supplied managed span with the current
+     * execution context (thread).
+     *
+     * @param managedSpan The managed span
+     */
+    public void activate(ManagedSpan managedSpan) {
+        if (managedSpan instanceof LinkedManagedSpan) {
+            managed.set((LinkedManagedSpan)managedSpan);
+        }
+    }
+
+    @Override
+    public ManagedSpan current() {
+        LinkedManagedSpan current = refreshCurrent();
+        return current != null ? current : NO_MANAGED_SPAN;
+    }
+
+    @Override
+    public void clear() {
+        managed.remove();
+    }
+
+    @Override
+    @Deprecated
+    public Span currentSpan() {
+        ManagedSpan current = current();
+        return current.getSpan() != null ? current.getSpan() : 
NoopSpan.INSTANCE;
+    }
+
+    /**
+     * @see #activate(Span)
+     * @deprecated renamed to activate()
+     */
+    @Deprecated
+    public ManagedSpan manage(Span span) {
+        return activate(span);
+    }
+
+    @Override
+    public String toString() {
+        return getClass().getSimpleName();
+    }
+
+    private final class LinkedManagedSpan implements ManagedSpan {
+        private final LinkedManagedSpan parent;
+        private final Span span;
+        private final AtomicBoolean deactivated = new AtomicBoolean(false);
+
+        private LinkedManagedSpan(Span span, LinkedManagedSpan parent) {
+            this.parent = parent;
+            this.span = span;
+        }
+
+        @Override
+        public Span getSpan() {
+            return span;
+        }
+
+        public void deactivate() {
+            if (deactivated.compareAndSet(false, true)) {
+                LinkedManagedSpan current = refreshCurrent(); // Trigger 
stack-unwinding algorithm.
+                LOGGER.log(Level.FINER, "Released {0}, current span is {1}.", 
new Object[]{this, current});
+            } else {
+                LOGGER.log(Level.FINEST, "No action needed, {0} was already 
deactivated.", this);
+            }
+        }
+
+        @Override
+        public void close() {
+            deactivate();
+        }
+
+        /**
+         * @see #deactivate()
+         * @deprecated renamed to deactivate()
+         */
+        @Deprecated
+        public void release() {
+            deactivate();
+        }
+
+        @Override
+        public String toString() {
+            return getClass().getSimpleName() + '{' + span + '}';
+        }
+    }
+
+    /**
+     * Empty implementation signifying there is no managed span.
+     */
+    private static final class NoManagedSpan implements ManagedSpan {
+        private NoManagedSpan() {
+        }
+
+        @Override
+        public Span getSpan() {
+            return null;
+        }
+
+        @Override
+        public void deactivate() {
+            // no-op
+        }
+
+        @Override
+        public void close() {
+            // no-op
+        }
+
+        @Override
+        public String toString() {
+            return getClass().getSimpleName();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/a8910d54/components/camel-opentracing/src/main/java/org/apache/camel/opentracing/concurrent/OpenTracingExecutorServiceManager.java
----------------------------------------------------------------------
diff --git 
a/components/camel-opentracing/src/main/java/org/apache/camel/opentracing/concurrent/OpenTracingExecutorServiceManager.java
 
b/components/camel-opentracing/src/main/java/org/apache/camel/opentracing/concurrent/OpenTracingExecutorServiceManager.java
new file mode 100644
index 0000000..2b19a88
--- /dev/null
+++ 
b/components/camel-opentracing/src/main/java/org/apache/camel/opentracing/concurrent/OpenTracingExecutorServiceManager.java
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.opentracing.concurrent;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+
+import io.opentracing.contrib.spanmanager.SpanManager;
+import 
io.opentracing.contrib.spanmanager.concurrent.SpanPropagatingExecutorService;
+
+import org.apache.camel.spi.ExecutorServiceManager;
+import org.apache.camel.spi.ThreadPoolFactory;
+import org.apache.camel.spi.ThreadPoolProfile;
+
+public class OpenTracingExecutorServiceManager implements 
ExecutorServiceManager {
+
+    private final ExecutorServiceManager delegate;
+    private final SpanManager spanManager;
+
+    public OpenTracingExecutorServiceManager(ExecutorServiceManager delegate, 
SpanManager spanManager) {
+        this.delegate = delegate;
+        this.spanManager = spanManager;
+    }
+
+    @Override
+    public void shutdown() throws Exception {
+        delegate.shutdown();
+    }
+
+    @Override
+    public void start() throws Exception {
+        delegate.start();
+    }
+
+    @Override
+    public void stop() throws Exception {
+        delegate.stop();
+    }
+
+    @Override
+    public boolean awaitTermination(ExecutorService arg0, long arg1) throws 
InterruptedException {
+        return delegate.awaitTermination(arg0, arg1);
+    }
+
+    @Override
+    public ThreadPoolProfile getDefaultThreadPoolProfile() {
+        return delegate.getDefaultThreadPoolProfile();
+    }
+
+    @Override
+    public long getShutdownAwaitTermination() {
+        return delegate.getShutdownAwaitTermination();
+    }
+
+    @Override
+    public String getThreadNamePattern() {
+        return delegate.getThreadNamePattern();
+    }
+
+    @Override
+    public ThreadPoolFactory getThreadPoolFactory() {
+        return delegate.getThreadPoolFactory();
+    }
+
+    @Override
+    public ThreadPoolProfile getThreadPoolProfile(String arg0) {
+        return delegate.getThreadPoolProfile(arg0);
+    }
+
+    @Override
+    public ExecutorService newCachedThreadPool(Object arg0, String arg1) {
+        return new 
SpanPropagatingExecutorService(delegate.newCachedThreadPool(arg0, arg1), 
spanManager);
+    }
+
+    @Override
+    public ScheduledExecutorService newDefaultScheduledThreadPool(Object arg0, 
String arg1) {
+        return delegate.newDefaultScheduledThreadPool(arg0, arg1);
+    }
+
+    @Override
+    public ExecutorService newDefaultThreadPool(Object arg0, String arg1) {
+        return new 
SpanPropagatingExecutorService(delegate.newDefaultThreadPool(arg0, arg1), 
spanManager);
+    }
+
+    @Override
+    public ExecutorService newFixedThreadPool(Object arg0, String arg1, int 
arg2) {
+        return new 
SpanPropagatingExecutorService(delegate.newFixedThreadPool(arg0, arg1, arg2), 
spanManager);
+    }
+
+    @Override
+    public ScheduledExecutorService newScheduledThreadPool(Object arg0, String 
arg1, int arg2) {
+        return delegate.newScheduledThreadPool(arg0, arg1, arg2);
+    }
+
+    @Override
+    public ScheduledExecutorService newScheduledThreadPool(Object arg0, String 
arg1, ThreadPoolProfile arg2) {
+        return delegate.newScheduledThreadPool(arg0, arg1, arg2);
+    }
+
+    @Override
+    public ScheduledExecutorService newScheduledThreadPool(Object arg0, String 
arg1, String arg2) {
+        return delegate.newScheduledThreadPool(arg0, arg1, arg2);
+    }
+
+    @Override
+    public ExecutorService newSingleThreadExecutor(Object arg0, String arg1) {
+        return new 
SpanPropagatingExecutorService(delegate.newSingleThreadExecutor(arg0, arg1), 
spanManager);
+    }
+
+    @Override
+    public ScheduledExecutorService newSingleThreadScheduledExecutor(Object 
arg0, String arg1) {
+        return delegate.newSingleThreadScheduledExecutor(arg0, arg1);
+    }
+
+    @Override
+    public Thread newThread(String arg0, Runnable arg1) {
+        return delegate.newThread(arg0, arg1);
+    }
+
+    @Override
+    public ExecutorService newThreadPool(Object arg0, String arg1, 
ThreadPoolProfile arg2) {
+        return new SpanPropagatingExecutorService(delegate.newThreadPool(arg0, 
arg1, arg2), spanManager);
+    }
+
+    @Override
+    public ExecutorService newThreadPool(Object arg0, String arg1, String 
arg2) {
+        return new SpanPropagatingExecutorService(delegate.newThreadPool(arg0, 
arg1, arg2), spanManager);
+    }
+
+    @Override
+    public ExecutorService newThreadPool(Object arg0, String arg1, int arg2, 
int arg3) {
+        return new SpanPropagatingExecutorService(delegate.newThreadPool(arg0, 
arg1, arg2, arg3), spanManager);
+    }
+
+    @Override
+    public void registerThreadPoolProfile(ThreadPoolProfile arg0) {
+        delegate.registerThreadPoolProfile(arg0);
+    }
+
+    @Override
+    public String resolveThreadName(String arg0) {
+        return delegate.resolveThreadName(arg0);
+    }
+
+    @Override
+    public void setDefaultThreadPoolProfile(ThreadPoolProfile arg0) {
+        delegate.setDefaultThreadPoolProfile(arg0);
+    }
+
+    @Override
+    public void setShutdownAwaitTermination(long arg0) {
+        delegate.setShutdownAwaitTermination(arg0);
+    }
+
+    @Override
+    public void setThreadNamePattern(String arg0) throws 
IllegalArgumentException {
+        delegate.setThreadNamePattern(arg0);
+    }
+
+    @Override
+    public void setThreadPoolFactory(ThreadPoolFactory arg0) {
+        delegate.setThreadPoolFactory(arg0);
+    }
+
+    @Override
+    public void shutdown(ExecutorService arg0) {
+        delegate.shutdown(arg0);
+    }
+
+    @Override
+    public void shutdownGraceful(ExecutorService arg0) {
+        delegate.shutdownGraceful(arg0);
+    }
+
+    @Override
+    public void shutdownGraceful(ExecutorService arg0, long arg1) {
+        delegate.shutdownGraceful(arg0, arg1);
+    }
+
+    @Override
+    public List<Runnable> shutdownNow(ExecutorService arg0) {
+        return delegate.shutdownNow(arg0);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/a8910d54/components/camel-opentracing/src/test/java/org/apache/camel/opentracing/CamelOpenTracingTestSupport.java
----------------------------------------------------------------------
diff --git 
a/components/camel-opentracing/src/test/java/org/apache/camel/opentracing/CamelOpenTracingTestSupport.java
 
b/components/camel-opentracing/src/test/java/org/apache/camel/opentracing/CamelOpenTracingTestSupport.java
index 6c7ad98..a48cee0 100644
--- 
a/components/camel-opentracing/src/test/java/org/apache/camel/opentracing/CamelOpenTracingTestSupport.java
+++ 
b/components/camel-opentracing/src/test/java/org/apache/camel/opentracing/CamelOpenTracingTestSupport.java
@@ -18,11 +18,15 @@ package org.apache.camel.opentracing;
 
 import java.net.URI;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
 
 import io.opentracing.Span;
+import io.opentracing.mock.MockSpan;
 import io.opentracing.mock.MockTracer;
 import io.opentracing.mock.MockTracer.Propagator;
 import io.opentracing.tag.Tags;
@@ -63,42 +67,63 @@ public class CamelOpenTracingTestSupport extends 
CamelTestSupport {
     }
 
     protected void verify() {
+        verify(false);
+    }
+
+    protected void verify(boolean async) {
         assertEquals("Incorrect number of spans", testdata.length, 
tracer.finishedSpans().size());
 
+        verifySameTrace();
+
+        List<MockSpan> spans = tracer.finishedSpans();
+        if (async) {
+            final List<MockSpan> unsortedSpans = spans;
+            spans = Arrays.asList(testdata).stream()
+                    .map(td -> findSpan(td, 
unsortedSpans)).distinct().collect(Collectors.toList());
+            assertEquals("Incorrect number of spans after sorting", 
testdata.length, spans.size());
+        }
+
         for (int i = 0; i < testdata.length; i++) {
-            if (i > 0) {
-                assertEquals(testdata[i].getLabel(), 
tracer.finishedSpans().get(0).context().traceId(),
-                    tracer.finishedSpans().get(i).context().traceId());
-            }
+            verifySpan(i, testdata, spans);
+        }
+    }
 
-            String component = (String) 
tracer.finishedSpans().get(i).tags().get(Tags.COMPONENT.getKey());
-            assertNotNull(component);
-            assertEquals(testdata[i].getLabel(),
-                SpanDecorator.CAMEL_COMPONENT + URI.create((String) 
testdata[i].getUri()).getScheme(),
-                component);
-            assertEquals(testdata[i].getLabel(), testdata[i].getUri(),
-                tracer.finishedSpans().get(i).tags().get("camel.uri"));
-
-            // If span associated with TestSEDASpanDecorator, check that 
pre/post tags have been defined
-            if ("camel-seda".equals(component)) {
-                
assertTrue(tracer.finishedSpans().get(i).tags().containsKey("pre"));
-                
assertTrue(tracer.finishedSpans().get(i).tags().containsKey("post"));
-            }
+    protected MockSpan findSpan(SpanTestData testdata, List<MockSpan> spans) {
+        return spans.stream().filter(s -> 
s.operationName().equals(testdata.getOperation())
+                && s.tags().get("camel.uri").equals(testdata.getUri())
+                && 
s.tags().get(Tags.SPAN_KIND.getKey()).equals(testdata.getKind())).findFirst().orElse(null);
+    }
 
-            assertEquals(testdata[i].getLabel(), testdata[i].getOperation(), 
tracer.finishedSpans().get(i).operationName());
+    protected void verifySpan(int index, SpanTestData[] testdata, 
List<MockSpan> spans) {
+        String component = (String) 
spans.get(index).tags().get(Tags.COMPONENT.getKey());
+        assertNotNull(component);
+        assertEquals(testdata[index].getLabel(),
+            SpanDecorator.CAMEL_COMPONENT + URI.create((String) 
testdata[index].getUri()).getScheme(),
+            component);
+        assertEquals(testdata[index].getLabel(), testdata[index].getUri(), 
spans.get(index).tags().get("camel.uri"));
+
+        // If span associated with TestSEDASpanDecorator, check that pre/post 
tags have been defined
+        if ("camel-seda".equals(component)) {
+            assertTrue(spans.get(index).tags().containsKey("pre"));
+            assertTrue(spans.get(index).tags().containsKey("post"));
+        }
 
-            assertEquals(testdata[i].getLabel(), testdata[i].getKind(),
-                
tracer.finishedSpans().get(i).tags().get(Tags.SPAN_KIND.getKey()));
+        assertEquals(testdata[index].getLabel(), 
testdata[index].getOperation(), spans.get(index).operationName());
 
-            if (testdata[i].getParentId() != -1) {
-                assertEquals(testdata[i].getLabel(),
-                    
tracer.finishedSpans().get(testdata[i].getParentId()).context().spanId(),
-                    tracer.finishedSpans().get(i).parentId());
-            }
+        assertEquals(testdata[index].getLabel(), testdata[index].getKind(),
+                spans.get(index).tags().get(Tags.SPAN_KIND.getKey()));
 
+        if (testdata[index].getParentId() != -1) {
+            assertEquals(testdata[index].getLabel(),
+                spans.get(testdata[index].getParentId()).context().spanId(),
+                spans.get(index).parentId());
         }
     }
 
+    protected void verifySameTrace() {
+        assertEquals(1, tracer.finishedSpans().stream().map(s -> 
s.context().traceId()).distinct().count());
+    }
+
     protected void verifyTraceSpanNumbers(int numOfTraces, int 
numSpansPerTrace) {
         Map<Long, List<Span>> traces = new HashMap<Long, List<Span>>();
 

http://git-wip-us.apache.org/repos/asf/camel/blob/a8910d54/components/camel-opentracing/src/test/java/org/apache/camel/opentracing/MulticastParallelRouteTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-opentracing/src/test/java/org/apache/camel/opentracing/MulticastParallelRouteTest.java
 
b/components/camel-opentracing/src/test/java/org/apache/camel/opentracing/MulticastParallelRouteTest.java
new file mode 100644
index 0000000..856f931
--- /dev/null
+++ 
b/components/camel-opentracing/src/test/java/org/apache/camel/opentracing/MulticastParallelRouteTest.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.opentracing;
+
+import io.opentracing.tag.Tags;
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.Test;
+
+public class MulticastParallelRouteTest extends CamelOpenTracingTestSupport {
+
+    private static SpanTestData[] testdata = {
+        new SpanTestData().setLabel("seda:b 
server").setUri("seda://b").setOperation("b")
+            .setKind(Tags.SPAN_KIND_SERVER).setParentId(1),
+        new SpanTestData().setLabel("seda:b 
client").setUri("seda://b").setOperation("b")
+            .setKind(Tags.SPAN_KIND_CLIENT).setParentId(4),
+        new SpanTestData().setLabel("seda:c 
server").setUri("seda://c").setOperation("c")
+            .setKind(Tags.SPAN_KIND_SERVER).setParentId(3),
+        new SpanTestData().setLabel("seda:c 
client").setUri("seda://c").setOperation("c")
+            .setKind(Tags.SPAN_KIND_CLIENT).setParentId(4),
+        new SpanTestData().setLabel("seda:a 
server").setUri("seda://a").setOperation("a")
+            .setKind(Tags.SPAN_KIND_SERVER).setParentId(5),
+        new SpanTestData().setLabel("seda:a 
client").setUri("seda://a").setOperation("a")
+            .setKind(Tags.SPAN_KIND_CLIENT).setParentId(6),
+        new SpanTestData().setLabel("direct:start 
server").setUri("direct://start").setOperation("start")
+            .setKind(Tags.SPAN_KIND_SERVER).setParentId(7),
+        new SpanTestData().setLabel("direct:start 
client").setUri("direct://start").setOperation("start")
+            .setKind(Tags.SPAN_KIND_CLIENT)
+    };
+
+    public MulticastParallelRouteTest() {
+        super(testdata);
+    }
+
+    @Test
+    public void testRoute() throws Exception {
+        template.requestBody("direct:start", "Hello");
+
+        verify(true);
+    }
+
+    @Override
+    protected RoutesBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start").to("seda:a").routeId("start");
+
+                from("seda:a").routeId("a")
+                    .log("routing at ${routeId}")
+                    .multicast().parallelProcessing()
+                    .to("seda:b", "seda:c")
+                    .end()
+                    .log("End of routing");
+
+                from("seda:b").routeId("b")
+                    .log("routing at ${routeId}")
+                    .delay(simple("${random(1000,2000)}"));
+
+                from("seda:c").routeId("c")
+                    .log("routing at ${routeId}")
+                    .delay(simple("${random(0,100)}"));
+            }
+        };
+    }
+}

Reply via email to