This is an automated email from the ASF dual-hosted git repository. ppalaga pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit 5ab7abb15ba5fa388c57f2a1c949bef68d6e0eba Author: Peter Palaga <ppal...@redhat.com> AuthorDate: Wed Jun 28 23:53:45 2023 +0200 Make CurrentSpanTest in Micrometer and Observation more robust by awaiting Span.current().getSpanContext().isValid() to become false after sending/receiving messages --- components/camel-observation/pom.xml | 6 ++++ .../CamelMicrometerObservationTestSupport.java | 12 ++++++- .../apache/camel/observation/CurrentSpanTest.java | 38 ++++++++++++++-------- components/camel-opentelemetry/pom.xml | 6 ++++ .../CamelOpenTelemetryTestSupport.java | 9 +++++ .../camel/opentelemetry/CurrentSpanTest.java | 37 ++++++++++++++------- 6 files changed, 82 insertions(+), 26 deletions(-) diff --git a/components/camel-observation/pom.xml b/components/camel-observation/pom.xml index 9d2d5dabf88..24aeb5748cf 100644 --- a/components/camel-observation/pom.xml +++ b/components/camel-observation/pom.xml @@ -80,6 +80,12 @@ <version>${opentelemetry-version}</version> <scope>test</scope> </dependency> + <dependency> + <groupId>org.awaitility</groupId> + <artifactId>awaitility</artifactId> + <version>${awaitility-version}</version> + <scope>test</scope> + </dependency> </dependencies> diff --git a/components/camel-observation/src/test/java/org/apache/camel/observation/CamelMicrometerObservationTestSupport.java b/components/camel-observation/src/test/java/org/apache/camel/observation/CamelMicrometerObservationTestSupport.java index 945c4f8b8f5..4022b6f186a 100644 --- a/components/camel-observation/src/test/java/org/apache/camel/observation/CamelMicrometerObservationTestSupport.java +++ b/components/camel-observation/src/test/java/org/apache/camel/observation/CamelMicrometerObservationTestSupport.java @@ -25,6 +25,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import io.micrometer.core.instrument.MeterRegistry; @@ -56,6 +57,7 @@ import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; import org.apache.camel.CamelContext; import org.apache.camel.test.junit5.CamelTestSupport; import org.apache.camel.tracing.SpanDecorator; +import org.awaitility.Awaitility; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -183,7 +185,14 @@ class CamelMicrometerObservationTestSupport extends CamelTestSupport { } protected void verifyTraceSpanNumbers(int numOfTraces, int numSpansPerTrace) { - Map<String, List<SpanData>> traces = new HashMap<>(); + final Map<String, List<SpanData>> traces = new HashMap<>(); + + Awaitility.await() + .alias("inMemorySpanExporter.getFinishedSpanItems() should eventually contain all expected spans") + .atMost(5, TimeUnit.SECONDS) + .pollInterval(10, TimeUnit.MILLISECONDS) + .pollDelay(0, TimeUnit.MILLISECONDS) + .until(() -> inMemorySpanExporter.getFinishedSpanItems().size() >= (numOfTraces * numSpansPerTrace)); List<SpanData> finishedSpans = inMemorySpanExporter.getFinishedSpanItems(); // Sort spans into separate traces @@ -192,6 +201,7 @@ class CamelMicrometerObservationTestSupport extends CamelTestSupport { spans.add(finishedSpans.get(i)); } + LOG.info("Found traces: " + traces); assertEquals(numOfTraces, traces.size()); for (Map.Entry<String, List<SpanData>> spans : traces.entrySet()) { diff --git a/components/camel-observation/src/test/java/org/apache/camel/observation/CurrentSpanTest.java b/components/camel-observation/src/test/java/org/apache/camel/observation/CurrentSpanTest.java index 158e5b8290a..d66e9ce170c 100644 --- a/components/camel-observation/src/test/java/org/apache/camel/observation/CurrentSpanTest.java +++ b/components/camel-observation/src/test/java/org/apache/camel/observation/CurrentSpanTest.java @@ -46,10 +46,10 @@ import org.apache.camel.support.DefaultConsumer; import org.apache.camel.support.DefaultProducer; import org.apache.camel.tracing.ActiveSpanManager; import org.apache.camel.util.StopWatch; +import org.awaitility.Awaitility; import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -82,12 +82,11 @@ class CurrentSpanTest extends CamelMicrometerObservationTestSupport { // sync pipeline template.sendBody("direct:bar", "Hello World"); + awaitInvalidSpanContext(); List<SpanData> spans = verify(expectedSpans, false); assertEquals(spans.get(0).getParentSpanId(), spans.get(1).getSpanId()); - // validates that span was active in async producer's processor - assertFalse(Span.current().getSpanContext().isValid()); } @Test @@ -102,12 +101,11 @@ class CurrentSpanTest extends CamelMicrometerObservationTestSupport { // sync to async pipeline template.sendBody("direct:foo", "Hello World"); + awaitInvalidSpanContext(); List<SpanData> spans = verify(expectedSpans, false); assertEquals(spans.get(0).getParentSpanId(), spans.get(1).getSpanId()); - // context is cleaned up - assertFalse(Span.current().getSpanContext().isValid()); } @Test @@ -124,10 +122,10 @@ class CurrentSpanTest extends CamelMicrometerObservationTestSupport { // sync pipeline template.sendBody("asyncmock1:start", "Hello World"); + awaitInvalidSpanContext(); List<SpanData> spans = verify(expectedSpans, false); assertEquals(spans.get(0).getParentSpanId(), spans.get(1).getSpanId()); - assertFalse(Span.current().getSpanContext().isValid()); } @Test @@ -143,10 +141,10 @@ class CurrentSpanTest extends CamelMicrometerObservationTestSupport { // async pipeline template.sendBody("asyncmock2:start", "Hello World"); + awaitInvalidSpanContext(); List<SpanData> spans = verify(expectedSpans, false); assertEquals(spans.get(0).getParentSpanId(), spans.get(1).getSpanId()); - assertFalse(Span.current().getSpanContext().isValid()); } @Test @@ -159,7 +157,7 @@ class CurrentSpanTest extends CamelMicrometerObservationTestSupport { }; assertThrows(CamelExecutionException.class, () -> template.sendBody("asyncmock:fail", "Hello World")); - assertFalse(Span.current().getSpanContext().isValid()); + awaitInvalidSpanContext(); List<SpanData> spans = verify(expectedSpans, false); assertEquals(spans.get(0).getParentSpanId(), spans.get(1).getSpanId()); @@ -186,19 +184,19 @@ class CurrentSpanTest extends CamelMicrometerObservationTestSupport { // sync pipeline template.sendBody("direct:start", "Hello World"); + awaitInvalidSpanContext(); List<SpanData> spans = verify(expectedSpans, false); assertEquals(spans.get(0).getParentSpanId(), spans.get(3).getSpanId()); assertEquals(spans.get(1).getParentSpanId(), spans.get(3).getSpanId()); assertEquals(spans.get(2).getParentSpanId(), spans.get(3).getSpanId()); - assertFalse(Span.current().getSpanContext().isValid()); } @Test void testContextDoesNotLeak() { for (int i = 0; i < 30; i++) { template.sendBody("asyncmock3:start", String.valueOf(i)); - assertFalse(Span.current().getSpanContext().isValid()); + awaitInvalidSpanContext(); } verifyTraceSpanNumbers(30, 11); @@ -262,7 +260,12 @@ class CurrentSpanTest extends CamelMicrometerObservationTestSupport { } - assertFalse(Span.current().getSpanContext().isValid(), errorMessage); + Awaitility.await() + .alias(errorMessage) + .atMost(5, TimeUnit.SECONDS) + .pollInterval(10, TimeUnit.MILLISECONDS) + .pollDelay(0, TimeUnit.MILLISECONDS) + .until(() -> !Span.current().getSpanContext().isValid()); } private static class AsyncMockComponent extends MockComponent { @@ -274,8 +277,8 @@ class CurrentSpanTest extends CamelMicrometerObservationTestSupport { } private static class AsyncMockEndpoint extends MockEndpoint { - private static final Executor DELAYED - = CompletableFuture.delayedExecutor(10L, TimeUnit.MILLISECONDS, new ForkJoinPool(3)); + private static final Executor DELAYED = CompletableFuture.delayedExecutor(10L, TimeUnit.MILLISECONDS, + new ForkJoinPool(3)); private Consumer consumer; private final String key; @@ -361,4 +364,13 @@ class CurrentSpanTest extends CamelMicrometerObservationTestSupport { private static void assertCurrentSpan(Exchange exchange) { assertEquals(Span.current().getSpanContext().getSpanId(), ActiveSpanManager.getSpan(exchange).spanId()); } + + private void awaitInvalidSpanContext() { + Awaitility.await() + .alias("Span.current().getSpanContext().isValid() should eventually return false") + .atMost(5, TimeUnit.SECONDS) + .pollInterval(10, TimeUnit.MILLISECONDS) + .pollDelay(0, TimeUnit.MILLISECONDS) + .until(() -> !Span.current().getSpanContext().isValid()); + } } diff --git a/components/camel-opentelemetry/pom.xml b/components/camel-opentelemetry/pom.xml index 284c7f172ca..3c91e03d650 100644 --- a/components/camel-opentelemetry/pom.xml +++ b/components/camel-opentelemetry/pom.xml @@ -90,6 +90,12 @@ <version>${opentelemetry-version}</version> <scope>test</scope> </dependency> + <dependency> + <groupId>org.awaitility</groupId> + <artifactId>awaitility</artifactId> + <version>${awaitility-version}</version> + <scope>test</scope> + </dependency> </dependencies> diff --git a/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/CamelOpenTelemetryTestSupport.java b/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/CamelOpenTelemetryTestSupport.java index d0139a7f6a0..9555e747dd2 100644 --- a/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/CamelOpenTelemetryTestSupport.java +++ b/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/CamelOpenTelemetryTestSupport.java @@ -24,6 +24,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import io.opentelemetry.api.common.AttributeKey; @@ -39,6 +40,7 @@ import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; import org.apache.camel.CamelContext; import org.apache.camel.test.junit5.CamelTestSupport; import org.apache.camel.tracing.SpanDecorator; +import org.awaitility.Awaitility; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -136,6 +138,12 @@ class CamelOpenTelemetryTestSupport extends CamelTestSupport { protected void verifyTraceSpanNumbers(int numOfTraces, int numSpansPerTrace) { Map<String, List<SpanData>> traces = new HashMap<>(); + Awaitility.await() + .alias("inMemorySpanExporter.getFinishedSpanItems() should eventually contain all expected spans") + .atMost(5, TimeUnit.SECONDS) + .pollInterval(10, TimeUnit.MILLISECONDS) + .pollDelay(0, TimeUnit.MILLISECONDS) + .until(() -> inMemorySpanExporter.getFinishedSpanItems().size() >= (numOfTraces * numSpansPerTrace)); List<SpanData> finishedSpans = inMemorySpanExporter.getFinishedSpanItems(); // Sort spans into separate traces @@ -144,6 +152,7 @@ class CamelOpenTelemetryTestSupport extends CamelTestSupport { spans.add(finishedSpans.get(i)); } + LOG.info("Found traces: " + traces); assertEquals(numOfTraces, traces.size()); for (Map.Entry<String, List<SpanData>> spans : traces.entrySet()) { diff --git a/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/CurrentSpanTest.java b/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/CurrentSpanTest.java index 25d147ede96..b2a70500593 100644 --- a/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/CurrentSpanTest.java +++ b/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/CurrentSpanTest.java @@ -46,10 +46,10 @@ import org.apache.camel.support.DefaultConsumer; import org.apache.camel.support.DefaultProducer; import org.apache.camel.tracing.ActiveSpanManager; import org.apache.camel.util.StopWatch; +import org.awaitility.Awaitility; import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -82,11 +82,10 @@ class CurrentSpanTest extends CamelOpenTelemetryTestSupport { // sync pipeline template.sendBody("direct:bar", "Hello World"); + awaitInvalidSpanContext(); + List<SpanData> spans = verify(expectedSpans, false); assertEquals(spans.get(0).getParentSpanId(), spans.get(1).getSpanId()); - - // validates that span was active in async producer's processor - assertFalse(Span.current().getSpanContext().isValid()); } @Test @@ -100,12 +99,11 @@ class CurrentSpanTest extends CamelOpenTelemetryTestSupport { // sync to async pipeline template.sendBody("direct:foo", "Hello World"); + awaitInvalidSpanContext(); List<SpanData> spans = verify(expectedSpans, false); assertEquals(spans.get(0).getParentSpanId(), spans.get(1).getSpanId()); - // context is cleaned up - assertFalse(Span.current().getSpanContext().isValid()); } @Test @@ -121,10 +119,10 @@ class CurrentSpanTest extends CamelOpenTelemetryTestSupport { // sync pipeline template.sendBody("asyncmock1:start", "Hello World"); + awaitInvalidSpanContext(); List<SpanData> spans = verify(expectedSpans, false); assertEquals(spans.get(0).getParentSpanId(), spans.get(1).getSpanId()); - assertFalse(Span.current().getSpanContext().isValid()); } @Test @@ -139,10 +137,10 @@ class CurrentSpanTest extends CamelOpenTelemetryTestSupport { // async pipeline template.sendBody("asyncmock2:start", "Hello World"); + awaitInvalidSpanContext(); List<SpanData> spans = verify(expectedSpans, false); assertEquals(spans.get(0).getParentSpanId(), spans.get(1).getSpanId()); - assertFalse(Span.current().getSpanContext().isValid()); } @Test @@ -154,7 +152,7 @@ class CurrentSpanTest extends CamelOpenTelemetryTestSupport { }; assertThrows(CamelExecutionException.class, () -> template.sendBody("asyncmock:fail", "Hello World")); - assertFalse(Span.current().getSpanContext().isValid()); + awaitInvalidSpanContext(); List<SpanData> spans = verify(expectedSpans, false); assertEquals(spans.get(0).getParentSpanId(), spans.get(1).getSpanId()); @@ -180,19 +178,19 @@ class CurrentSpanTest extends CamelOpenTelemetryTestSupport { // sync pipeline template.sendBody("direct:start", "Hello World"); + awaitInvalidSpanContext(); List<SpanData> spans = verify(expectedSpans, false); assertEquals(spans.get(0).getParentSpanId(), spans.get(3).getSpanId()); assertEquals(spans.get(1).getParentSpanId(), spans.get(3).getSpanId()); assertEquals(spans.get(2).getParentSpanId(), spans.get(3).getSpanId()); - assertFalse(Span.current().getSpanContext().isValid()); } @Test void testContextDoesNotLeak() { for (int i = 0; i < 30; i++) { template.sendBody("asyncmock3:start", String.valueOf(i)); - assertFalse(Span.current().getSpanContext().isValid()); + awaitInvalidSpanContext(); } verifyTraceSpanNumbers(30, 11); @@ -256,7 +254,12 @@ class CurrentSpanTest extends CamelOpenTelemetryTestSupport { } - assertFalse(Span.current().getSpanContext().isValid(), errorMessage); + Awaitility.await() + .alias(errorMessage) + .atMost(5, TimeUnit.SECONDS) + .pollInterval(10, TimeUnit.MILLISECONDS) + .pollDelay(0, TimeUnit.MILLISECONDS) + .until(() -> !Span.current().getSpanContext().isValid()); } private static class AsyncMockComponent extends MockComponent { @@ -355,4 +358,14 @@ class CurrentSpanTest extends CamelOpenTelemetryTestSupport { private static void assertCurrentSpan(Exchange exchange) { assertEquals(Span.current().getSpanContext().getSpanId(), ActiveSpanManager.getSpan(exchange).spanId()); } + + private void awaitInvalidSpanContext() { + Awaitility.await() + .alias("Span.current().getSpanContext().isValid() should eventually return false") + .atMost(5, TimeUnit.SECONDS) + .pollInterval(10, TimeUnit.MILLISECONDS) + .pollDelay(0, TimeUnit.MILLISECONDS) + .until(() -> !Span.current().getSpanContext().isValid()); + } + }