This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new 1d41b0b5fc0 CAMEL-17721: camel-core - MDC custom keys should preserve existing va… (#12465) 1d41b0b5fc0 is described below commit 1d41b0b5fc03ac7bd285b3be63c06cf54ea297fc Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Mon Dec 18 10:39:06 2023 +0100 CAMEL-17721: camel-core - MDC custom keys should preserve existing va… (#12465) * CAMEL-17721: camel-core - MDC custom keys should preserve existing value during routing, so users can alter its value. CAMEL-20246: WireTap should not create correlated exchange copy --- .../apache/camel/impl/engine/MDCUnitOfWork.java | 42 ++++++++++++- .../apache/camel/processor/WireTapProcessor.java | 2 + .../{MDCSplitTest.java => MDCCustomKeysTest.java} | 71 ++++++++++++++++------ .../org/apache/camel/processor/MDCSplitTest.java | 1 + .../{MDCSplitTest.java => WireTapMDCTest.java} | 64 +++++++++++-------- .../ROOT/pages/camel-4x-upgrade-guide-4_4.adoc | 10 +++ 6 files changed, 147 insertions(+), 43 deletions(-) diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java index e7c6c748849..d2cdffc6cb4 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java @@ -211,9 +211,42 @@ public class MDCUnitOfWork extends DefaultUnitOfWork implements Service { } } + /** + * Clear custom MDC values based on the configured MDC pattern + */ + protected void clearCustom(Exchange exchange) { + // clear custom patterns + if (pattern != null) { + + // only clear if the UoW is the parent UoW (split, multicast and other EIPs create child exchanges with their own UoW) + if (exchange != null) { + String cid = exchange.getProperty(ExchangePropertyKey.CORRELATION_ID, String.class); + if (cid != null && !cid.equals(exchange.getExchangeId())) { + return; + } + } + + Map<String, String> mdc = MDC.getCopyOfContextMap(); + if (mdc != null) { + if ("*".equals(pattern)) { + MDC.clear(); + } else { + final String[] patterns = pattern.split(","); + mdc.forEach((k, v) -> { + if (PatternHelper.matchPatterns(k, patterns)) { + MDC.remove(k); + } + }); + } + } + } + } + @Override public void done(Exchange exchange) { super.done(exchange); + // clear custom first + clearCustom(exchange); clear(); } @@ -227,6 +260,8 @@ public class MDCUnitOfWork extends DefaultUnitOfWork implements Service { @Override public void reset() { super.reset(); + // clear custom first + clearCustom(null); clear(); } @@ -309,7 +344,12 @@ public class MDCUnitOfWork extends DefaultUnitOfWork implements Service { MDC.put(MDC_CAMEL_CONTEXT_ID, camelContextId); } if (custom != null) { - custom.forEach(MDC::put); + // keep existing custom value to not override + custom.forEach((k, v) -> { + if (MDC.get(k) == null) { + MDC.put(k, v); + } + }); } } // need to setup the routeId finally diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java index c025f169672..755724ad85c 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java @@ -250,6 +250,8 @@ public class WireTapProcessor extends AsyncProcessorSupport private Exchange configureCopyExchange(Exchange exchange) { // must use a copy as we dont want it to cause side effects of the original exchange Exchange copy = processorExchangeFactory.createCorrelatedCopy(exchange, false); + // should not be correlated, but we needed to copy without handover + copy.removeProperty(ExchangePropertyKey.CORRELATION_ID); // set MEP to InOnly as this wire tap is a fire and forget copy.setPattern(ExchangePattern.InOnly); // move OUT to IN if needed diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/MDCSplitTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/MDCCustomKeysTest.java similarity index 72% copy from core/camel-core/src/test/java/org/apache/camel/processor/MDCSplitTest.java copy to core/camel-core/src/test/java/org/apache/camel/processor/MDCCustomKeysTest.java index 897eafb1438..8ca7df20ef3 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/MDCSplitTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/MDCCustomKeysTest.java @@ -26,16 +26,39 @@ import org.slf4j.MDC; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; -public class MDCSplitTest extends ContextTestSupport { +public class MDCCustomKeysTest extends ContextTestSupport { + + private MdcCheckerProcessor checker1 = new MdcCheckerProcessor("N/A"); + private MdcCheckerProcessor checker2 = new MdcCheckerProcessor("World"); @Test public void testMdcPreserved() throws Exception { + + MockEndpoint mock = getMockEndpoint("mock:end"); + mock.expectedBodiesReceived("A"); + + checker1.reset(); + checker2.reset(); + template.sendBody("direct:a", "A"); + + assertMockEndpointsSatisfied(); + } + + @Test + public void testMdcPreservedTwo() throws Exception { MockEndpoint mock = getMockEndpoint("mock:end"); - mock.expectedMessageCount(1); + mock.expectedBodiesReceived("A", "B"); + + checker1.reset(); + checker2.reset(); + template.sendBody("direct:a", "A"); - template.sendBody("direct:a", "A,B"); + checker1.reset(); + checker2.reset(); + template.sendBody("direct:a", "B"); assertMockEndpointsSatisfied(); } @@ -50,22 +73,25 @@ public class MDCSplitTest extends ContextTestSupport { context.setUseBreadcrumb(true); context.setMDCLoggingKeysPattern("custom*,my*"); - MdcCheckerProcessor checker = new MdcCheckerProcessor(); + from("direct:a").process(e -> { + + // custom should be empty + String hello = MDC.get("custom.hello"); + assertNull(hello); - from("direct:a").routeId("route-async").process(e -> { // custom is propagated - MDC.put("custom.hello", "World"); + MDC.put("custom.hello", "N/A"); // foo is propagated due we use the same thread MDC.put("foo", "Bar"); // myKey is propagated MDC.put("myKey", "Baz"); - }).process(checker) + }).process(checker1) .to("log:foo") - .split(body().tokenize(",")) - .process(checker) - .end() + .process(e -> { + MDC.put("custom.hello", "World"); + }) + .process(checker2) .to("mock:end"); - } }; } @@ -75,7 +101,6 @@ public class MDCSplitTest extends ContextTestSupport { */ private static class MdcCheckerProcessor implements Processor { - private String routeId = "route-async"; private String exchangeId; private String messageId; private String breadcrumbId; @@ -83,10 +108,25 @@ public class MDCSplitTest extends ContextTestSupport { private Long threadId; private String foo; + private final String expected; + + public MdcCheckerProcessor(String expected) { + this.expected = expected; + } + + public void reset() { + exchangeId = null; + messageId = null; + breadcrumbId = null; + contextId = null; + threadId = null; + foo = null; + } + @Override public void process(Exchange exchange) throws Exception { // custom is propagated as its pattern matches - assertEquals("World", MDC.get("custom.hello")); + assertEquals(expected, MDC.get("custom.hello")); assertEquals("Baz", MDC.get("myKey")); if (foo != null) { @@ -102,11 +142,6 @@ public class MDCSplitTest extends ContextTestSupport { } else { threadId = Thread.currentThread().getId(); } - - if (routeId != null) { - assertEquals(routeId, MDC.get("camel.routeId")); - } - if (exchangeId != null) { assertNotEquals(exchangeId, MDC.get("camel.exchangeId")); } else { diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/MDCSplitTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/MDCSplitTest.java index 897eafb1438..a78d59dd2b4 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/MDCSplitTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/MDCSplitTest.java @@ -62,6 +62,7 @@ public class MDCSplitTest extends ContextTestSupport { }).process(checker) .to("log:foo") .split(body().tokenize(",")) + .to("log:line") .process(checker) .end() .to("mock:end"); diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/MDCSplitTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/WireTapMDCTest.java similarity index 69% copy from core/camel-core/src/test/java/org/apache/camel/processor/MDCSplitTest.java copy to core/camel-core/src/test/java/org/apache/camel/processor/WireTapMDCTest.java index 897eafb1438..ccc020cee44 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/MDCSplitTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/WireTapMDCTest.java @@ -25,17 +25,16 @@ import org.junit.jupiter.api.Test; import org.slf4j.MDC; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertTrue; -public class MDCSplitTest extends ContextTestSupport { +public class WireTapMDCTest extends ContextTestSupport { @Test public void testMdcPreserved() throws Exception { MockEndpoint mock = getMockEndpoint("mock:end"); - mock.expectedMessageCount(1); + mock.expectedMessageCount(2); - template.sendBody("direct:a", "A,B"); + template.sendBody("seda:a", "A"); assertMockEndpointsSatisfied(); } @@ -50,22 +49,30 @@ public class MDCSplitTest extends ContextTestSupport { context.setUseBreadcrumb(true); context.setMDCLoggingKeysPattern("custom*,my*"); - MdcCheckerProcessor checker = new MdcCheckerProcessor(); - - from("direct:a").routeId("route-async").process(e -> { - // custom is propagated - MDC.put("custom.hello", "World"); - // foo is propagated due we use the same thread - MDC.put("foo", "Bar"); - // myKey is propagated - MDC.put("myKey", "Baz"); - }).process(checker) - .to("log:foo") - .split(body().tokenize(",")) + MdcCheckerProcessor checker = new MdcCheckerProcessor("route-a", "World", "MyValue"); + MdcCheckerProcessor checker2 = new MdcCheckerProcessor("route-b", "Moon", "MyValue2"); + + from("seda:a").routeId("route-a") + .process(e -> { + MDC.put("custom.hello", "World"); + MDC.put("foo", "Bar"); + MDC.put("myKey", "MyValue"); + }) + .process(checker) + .to("log:a") + .wireTap("direct:b") .process(checker) - .end() .to("mock:end"); + from("direct:b").routeId("route-b") + .process(e -> { + MDC.put("custom.hello", "Moon"); + MDC.put("foo", "Bar2"); + MDC.put("myKey", "MyValue2"); + }) + .process(checker2) + .to("log:b") + .to("mock:end"); } }; } @@ -75,7 +82,6 @@ public class MDCSplitTest extends ContextTestSupport { */ private static class MdcCheckerProcessor implements Processor { - private String routeId = "route-async"; private String exchangeId; private String messageId; private String breadcrumbId; @@ -83,11 +89,21 @@ public class MDCSplitTest extends ContextTestSupport { private Long threadId; private String foo; + private String expected1; + private String expected2; + private String expected3; + + public MdcCheckerProcessor(String expected1, String expected2, String expected3) { + this.expected1 = expected1; + this.expected2 = expected2; + this.expected3 = expected3; + } + @Override public void process(Exchange exchange) throws Exception { // custom is propagated as its pattern matches - assertEquals("World", MDC.get("custom.hello")); - assertEquals("Baz", MDC.get("myKey")); + assertEquals(expected2, MDC.get("custom.hello")); + assertEquals(expected3, MDC.get("myKey")); if (foo != null) { // foo propagated because its the same thread @@ -103,19 +119,19 @@ public class MDCSplitTest extends ContextTestSupport { threadId = Thread.currentThread().getId(); } - if (routeId != null) { - assertEquals(routeId, MDC.get("camel.routeId")); + if (expected1 != null) { + assertEquals(expected1, MDC.get("camel.routeId")); } if (exchangeId != null) { - assertNotEquals(exchangeId, MDC.get("camel.exchangeId")); + assertEquals(exchangeId, MDC.get("camel.exchangeId")); } else { exchangeId = MDC.get("camel.exchangeId"); assertTrue(exchangeId != null && exchangeId.length() > 0); } if (messageId != null) { - assertNotEquals(messageId, MDC.get("camel.messageId")); + assertEquals(messageId, MDC.get("camel.messageId")); } else { messageId = MDC.get("camel.messageId"); assertTrue(messageId != null && messageId.length() > 0); diff --git a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_4.adoc b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_4.adoc index bfc0737228a..be28d246969 100644 --- a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_4.adoc +++ b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_4.adoc @@ -15,6 +15,16 @@ The method `getCreated` is now deprecated. Access to the time-related informatio The `lookup` method in `org.apache.camel.component.properties.PropertiesLookup` now has a 2nd parameter for the default value. +==== WireTap EIP + +The copied exchange is no longer having exchange property CORRELATION_ID set that links to the original exchange. +The reason is that this link should only be for EIPs with sub exchanges such as Splitter and Multicast. + +==== MDC logging + +When using custom MDC keys (need to configure `MDCLoggingKeysPattern`) then these custom keys are cleared at the end of routing. +Also, custom keys is allowed to be changed during routing, using the `MDC.set(myKey, ...)` Java API. + === camel-main The route controller configuration has been moved from general main to its own group.