This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch camel-3.7.x in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-3.7.x by this push: new 26b54e1 Fixes CAMEL-16453. Update the Exchange object in serverRequest with the new span so that it is visible downstream via Exchange headers. In clientRequest do not assume the last server span is the parent for the current traceContext, as this will not be true in case of parallelProcessing. Modify ZipkinState to use Deque instead of Stack to allow retrieval of matching spanId. (#5296) 26b54e1 is described below commit 26b54e1f2c55e774874c3e793ba5effd63fcbc27 Author: Samrat Dhillon <samrat.dhil...@gmail.com> AuthorDate: Tue Apr 6 00:49:41 2021 -0400 Fixes CAMEL-16453. Update the Exchange object in serverRequest with the new span so that it is visible downstream via Exchange headers. In clientRequest do not assume the last server span is the parent for the current traceContext, as this will not be true in case of parallelProcessing. Modify ZipkinState to use Deque instead of Stack to allow retrieval of matching spanId. (#5296) Co-authored-by: Samrat Dhillon <samrat.dhil...@innovapost.com> --- .../java/org/apache/camel/zipkin/ZipkinState.java | 47 +++++++++++---- .../java/org/apache/camel/zipkin/ZipkinTracer.java | 7 +-- .../org/apache/camel/zipkin/ZipkinStateTest.java | 68 ++++++++++++++++++++++ 3 files changed, 107 insertions(+), 15 deletions(-) diff --git a/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinState.java b/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinState.java index 5f2d160..6cfa2f4 100644 --- a/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinState.java +++ b/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinState.java @@ -16,9 +16,12 @@ */ package org.apache.camel.zipkin; -import java.util.Stack; +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.Iterator; import brave.Span; +import brave.propagation.TraceContextOrSamplingFlags; import org.apache.camel.Exchange; /** @@ -31,39 +34,61 @@ public final class ZipkinState { public static final String KEY = "CamelZipkinState"; - private final Stack<Span> clientSpans = new Stack<>(); - private final Stack<Span> serverSpans = new Stack<>(); + private final Deque<Span> clientSpans = new ArrayDeque<>(); + private final Deque<Span> serverSpans = new ArrayDeque<>(); - public void pushClientSpan(Span span) { + public synchronized void pushClientSpan(Span span) { clientSpans.push(span); } - public Span popClientSpan() { - if (!clientSpans.empty()) { + public synchronized Span popClientSpan() { + if (!clientSpans.isEmpty()) { return clientSpans.pop(); } else { return null; } } - public void pushServerSpan(Span span) { + public synchronized void pushServerSpan(Span span) { serverSpans.push(span); } - public Span popServerSpan() { - if (!serverSpans.empty()) { + public synchronized Span popServerSpan() { + if (!serverSpans.isEmpty()) { return serverSpans.pop(); } else { return null; } } - public Span peekServerSpan() { - if (!serverSpans.empty()) { + private Span peekServerSpan() { + if (!serverSpans.isEmpty()) { return serverSpans.peek(); } else { return null; } } + public synchronized Span findMatchingServerSpan(Exchange exchange) { + String spanId = (String) exchange.getIn().getHeader(ZipkinConstants.SPAN_ID); + Span lastSpan = peekServerSpan(); + if (spanId == null) { + return lastSpan; + } + TraceContextOrSamplingFlags traceContext + = ZipkinTracer.EXTRACTOR.extract(new CamelRequest(exchange.getIn(), Span.Kind.SERVER)); + if (traceContext.context().spanId() == lastSpan.context().spanId()) { + return lastSpan; + } + + Iterator<Span> spanItr = serverSpans.iterator(); + while (spanItr.hasNext()) { + Span span = spanItr.next(); + if (span.context().spanId() == traceContext.context().spanId()) { + return span; + } + } + return lastSpan; + } + } diff --git a/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinTracer.java b/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinTracer.java index 57deb50..7883981 100644 --- a/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinTracer.java +++ b/components/camel-zipkin/src/main/java/org/apache/camel/zipkin/ZipkinTracer.java @@ -120,7 +120,7 @@ public class ZipkinTracer extends ServiceSupport implements RoutePolicyFactory, private static final String ZIPKIN_COLLECTOR_THRIFT_SERVICE = "zipkin-collector-thrift"; private static final Getter<CamelRequest, String> GETTER = (cr, key) -> cr.getHeader(key); private static final Setter<CamelRequest, String> SETTER = (cr, key, value) -> cr.setHeader(key, value); - private static final Extractor<CamelRequest> EXTRACTOR = B3Propagation.B3_STRING.extractor(GETTER); + static final Extractor<CamelRequest> EXTRACTOR = B3Propagation.B3_STRING.extractor(GETTER); private static final Injector<CamelRequest> INJECTOR = B3Propagation.B3_STRING.injector(SETTER); private final ZipkinEventNotifier eventNotifier = new ZipkinEventNotifier(); @@ -593,7 +593,7 @@ public class ZipkinTracer extends ServiceSupport implements RoutePolicyFactory, } // if we started from a server span then lets reuse that when we call a // downstream service - Span last = state.peekServerSpan(); + Span last = state.findMatchingServerSpan(event.getExchange()); Span span; if (last != null) { span = brave.tracer().newChild(last.context()); @@ -685,14 +685,13 @@ public class ZipkinTracer extends ServiceSupport implements RoutePolicyFactory, TraceContextOrSamplingFlags sampleFlag = EXTRACTOR.extract(cr); if (ObjectHelper.isEmpty(sampleFlag)) { span = brave.tracer().nextSpan(); - INJECTOR.inject(span.context(), cr); } else { span = brave.tracer().nextSpan(sampleFlag); } span.kind(spanKind).start(); ZipkinServerRequestAdapter parser = new ZipkinServerRequestAdapter(this, exchange); parser.onRequest(exchange, span.customizer()); - + INJECTOR.inject(span.context(), cr); // store span after request state.pushServerSpan(span); TraceContext context = span.context(); diff --git a/components/camel-zipkin/src/test/java/org/apache/camel/zipkin/ZipkinStateTest.java b/components/camel-zipkin/src/test/java/org/apache/camel/zipkin/ZipkinStateTest.java new file mode 100644 index 0000000..00917b7 --- /dev/null +++ b/components/camel-zipkin/src/test/java/org/apache/camel/zipkin/ZipkinStateTest.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.zipkin; + +import brave.Span; +import brave.Tracing; +import brave.propagation.TraceContext; +import brave.propagation.TraceContextOrSamplingFlags; +import org.apache.camel.Exchange; +import org.apache.camel.spring.SpringCamelContext; +import org.apache.camel.support.DefaultExchange; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +public class ZipkinStateTest { + + private ZipkinState state = new ZipkinState(); + + @Test + public void testZipkinState() { + TraceContext context = TraceContext.newBuilder().traceId(1L).spanId(2L).parentId(3L).build(); + TraceContextOrSamplingFlags sampling = TraceContextOrSamplingFlags.newBuilder(context).build(); + Tracing tracing = Tracing.newBuilder().build(); + + Span span1 = tracing.tracer().nextSpan(sampling); + state.pushServerSpan(span1); + + Span span2 = tracing.tracer().nextSpan(sampling); + state.pushServerSpan(span2); + + Exchange exchange1 = new DefaultExchange(new SpringCamelContext()); + exchange1.getIn().setHeader(ZipkinConstants.TRACE_ID, context.traceIdString()); + exchange1.getIn().setHeader(ZipkinConstants.PARENT_SPAN_ID, context.spanIdString()); + exchange1.getIn().setHeader(ZipkinConstants.SPAN_ID, span1.context().spanIdString()); + + Exchange exchange2 = new DefaultExchange(new SpringCamelContext()); + exchange2.getIn().setHeader(ZipkinConstants.TRACE_ID, context.traceIdString()); + exchange2.getIn().setHeader(ZipkinConstants.PARENT_SPAN_ID, context.spanIdString()); + exchange2.getIn().setHeader(ZipkinConstants.SPAN_ID, span2.context().spanIdString()); + + Span retrived = state.findMatchingServerSpan(exchange2); + assertThat(retrived.context().spanId()).isEqualTo(span2.context().spanId()); + assertThat(retrived.context().parentId()).isEqualTo(span2.context().parentId()); + assertThat(retrived.context().traceId()).isEqualTo(span2.context().traceId()); + + retrived = state.findMatchingServerSpan(exchange1); + assertThat(retrived.context().spanId()).isEqualTo(span1.context().spanId()); + assertThat(retrived.context().parentId()).isEqualTo(span1.context().parentId()); + assertThat(retrived.context().traceId()).isEqualTo(span1.context().traceId()); + + } + +}