This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 7d79ee922d0886df036703e5c62e6940de7b8a54 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Tue Jun 11 18:54:02 2019 +0200 CAMEL-13369: Message History EIP now supports filtering nodes and to keep a copy of the traced message. --- .../main/java/org/apache/camel/MessageHistory.java | 5 ++ .../apache/camel/spi/MessageHistoryFactory.java | 23 +++++- .../camel/processor/CamelInternalProcessor.java | 6 +- .../org/apache/camel/impl/DefaultCamelContext.java | 2 +- .../DefaultAsyncProcessorAwaitManagerTest.java | 6 +- .../processor/MessageHistoryCopyMessageTest.java | 84 ++++++++++++++++++++++ .../processor/MessageHistoryStepOnlyTest.java | 80 +++++++++++++++++++++ .../camel/support/DefaultMessageHistory.java | 16 +++++ .../support}/DefaultMessageHistoryFactory.java | 48 +++++++++++-- 9 files changed, 259 insertions(+), 11 deletions(-) diff --git a/core/camel-api/src/main/java/org/apache/camel/MessageHistory.java b/core/camel-api/src/main/java/org/apache/camel/MessageHistory.java index 93b4fb2..c4545ef 100644 --- a/core/camel-api/src/main/java/org/apache/camel/MessageHistory.java +++ b/core/camel-api/src/main/java/org/apache/camel/MessageHistory.java @@ -47,4 +47,9 @@ public interface MessageHistory { */ void nodeProcessingDone(); + /** + * A read-only copy of the message at the point of this history (if this has been enabled). + */ + Message getMessage(); + } diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/MessageHistoryFactory.java b/core/camel-api/src/main/java/org/apache/camel/spi/MessageHistoryFactory.java index e7f37aa..3c3c9b6 100644 --- a/core/camel-api/src/main/java/org/apache/camel/spi/MessageHistoryFactory.java +++ b/core/camel-api/src/main/java/org/apache/camel/spi/MessageHistoryFactory.java @@ -16,6 +16,7 @@ */ package org.apache.camel.spi; +import org.apache.camel.Exchange; import org.apache.camel.MessageHistory; import org.apache.camel.NamedNode; @@ -30,7 +31,27 @@ public interface MessageHistoryFactory { * @param routeId the route id * @param node the node in the route * @param timestamp the time the message processed at this node. + * @param exchange the current exchange * @return a new {@link MessageHistory} */ - MessageHistory newMessageHistory(String routeId, NamedNode node, long timestamp); + MessageHistory newMessageHistory(String routeId, NamedNode node, long timestamp, Exchange exchange); + + boolean isCopyMessage(); + + /** + * Sets whether to make a copy of the message in the {@link MessageHistory}. + * By default this is turned off. Beware that you should not mutate or change the content + * on the copied message, as its purpose is as a read-only view of the message. + */ + void setCopyMessage(boolean copyMessage); + + String getNodePattern(); + + /** + * An optional pattern to filter which nodes to trace in this message history. By default all nodes are included. + * To only include nodes that are Step EIPs then use the EIP shortname, eg step. + * You can also include multiple nodes separated by comma, eg step,wiretap,to + */ + void setNodePattern(String nodePattern); + } diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java index ba7fd2e..27faa0b 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java @@ -642,8 +642,10 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor { } } - MessageHistory history = factory.newMessageHistory(targetRouteId, definition, System.currentTimeMillis()); - list.add(history); + MessageHistory history = factory.newMessageHistory(targetRouteId, definition, System.currentTimeMillis(), exchange); + if (history != null) { + list.add(history); + } return history; } diff --git a/core/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java b/core/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java index 6b7ac7e..d47a107 100644 --- a/core/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java +++ b/core/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java @@ -41,7 +41,7 @@ import org.apache.camel.impl.engine.DefaultInflightRepository; import org.apache.camel.impl.engine.DefaultInjector; import org.apache.camel.impl.engine.DefaultLanguageResolver; import org.apache.camel.impl.engine.DefaultManagementNameStrategy; -import org.apache.camel.impl.engine.DefaultMessageHistoryFactory; +import org.apache.camel.support.DefaultMessageHistoryFactory; import org.apache.camel.impl.engine.DefaultNodeIdFactory; import org.apache.camel.impl.engine.DefaultPackageScanClassResolver; import org.apache.camel.impl.engine.DefaultProcessorFactory; diff --git a/core/camel-core/src/test/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManagerTest.java b/core/camel-core/src/test/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManagerTest.java index edbb158..8646b82 100644 --- a/core/camel-core/src/test/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManagerTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManagerTest.java @@ -23,7 +23,7 @@ import org.apache.camel.Exchange; import org.apache.camel.MessageHistory; import org.apache.camel.NamedNode; import org.apache.camel.impl.engine.DefaultAsyncProcessorAwaitManager; -import org.apache.camel.impl.engine.DefaultMessageHistoryFactory; +import org.apache.camel.support.DefaultMessageHistoryFactory; import org.apache.camel.spi.AsyncProcessorAwaitManager; import org.apache.camel.spi.MessageHistoryFactory; import org.apache.camel.support.DefaultExchange; @@ -78,7 +78,7 @@ public class DefaultAsyncProcessorAwaitManagerTest { LinkedList<MessageHistory> messageHistories = new LinkedList<>(); messageHistories.add(MESSAGE_HISTORY_FACTORY.newMessageHistory(null, new MockNamedNode().withId(null), - 0)); + 0, null)); exchange.setProperty(Exchange.MESSAGE_HISTORY, messageHistories); AsyncProcessorAwaitManager.AwaitThread awaitThread = defaultAsyncProcessorAwaitManager.browse().iterator().next(); assertThat(awaitThread.getRouteId(), is(nullValue())); @@ -92,7 +92,7 @@ public class DefaultAsyncProcessorAwaitManagerTest { LinkedList<MessageHistory> messageHistories = new LinkedList<>(); messageHistories.add(MESSAGE_HISTORY_FACTORY.newMessageHistory("routeId", new MockNamedNode().withId("nodeId"), - 0)); + 0, null)); exchange.setProperty(Exchange.MESSAGE_HISTORY, messageHistories); AsyncProcessorAwaitManager.AwaitThread awaitThread = defaultAsyncProcessorAwaitManager.browse().iterator().next(); assertThat(awaitThread.getRouteId(), is("routeId")); diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/MessageHistoryCopyMessageTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/MessageHistoryCopyMessageTest.java new file mode 100644 index 0000000..5249a14 --- /dev/null +++ b/core/camel-core/src/test/java/org/apache/camel/processor/MessageHistoryCopyMessageTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import java.util.List; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.MessageHistory; +import org.apache.camel.builder.RouteBuilder; +import org.junit.Test; + +public class MessageHistoryCopyMessageTest extends ContextTestSupport { + + @Test + public void testCopyMessage() throws Exception { + getMockEndpoint("mock:a").expectedMessageCount(1); + getMockEndpoint("mock:b").expectedMessageCount(1); + getMockEndpoint("mock:bar").expectedMessageCount(1); + + Exchange out = template.request("direct:start", e -> { + e.getMessage().setBody("Hello World"); + }); + + assertMockEndpointsSatisfied(); + + // only the step eips are in the history + List<MessageHistory> history = out.getProperty(Exchange.MESSAGE_HISTORY, List.class); + assertNotNull(history); + assertEquals(3, history.size()); + assertEquals("step", history.get(0).getNode().getShortName()); + assertEquals("a", history.get(0).getNode().getId()); + assertEquals("Hello World", history.get(0).getMessage().getBody()); + assertEquals("step", history.get(1).getNode().getShortName()); + assertEquals("b", history.get(1).getNode().getId()); + assertEquals("Bye World", history.get(1).getMessage().getBody()); + assertEquals("step", history.get(2).getNode().getShortName()); + assertEquals("bar", history.get(2).getNode().getId()); + assertEquals("Hi World", history.get(2).getMessage().getBody()); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + context.setMessageHistory(true); + context.getMessageHistoryFactory().setNodePattern("step"); + context.getMessageHistoryFactory().setCopyMessage(true); + + from("direct:start") + .step("a") + .transform().constant("Bye World") + .to("mock:a") + .end() + .step("b") + .transform().constant("Hi World") + .to("direct:bar") + .to("mock:b") + .end(); + + from("direct:bar") + .step("bar") + .to("log:bar") + .to("mock:bar") + .end(); + } + }; + } +} diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/MessageHistoryStepOnlyTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/MessageHistoryStepOnlyTest.java new file mode 100644 index 0000000..b5a73b9 --- /dev/null +++ b/core/camel-core/src/test/java/org/apache/camel/processor/MessageHistoryStepOnlyTest.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import java.util.List; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.MessageHistory; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.junit.Test; + +public class MessageHistoryStepOnlyTest extends ContextTestSupport { + + @Test + public void testStepOnly() throws Exception { + getMockEndpoint("mock:a").expectedMessageCount(1); + getMockEndpoint("mock:b").expectedMessageCount(1); + getMockEndpoint("mock:bar").expectedMessageCount(1); + + Exchange out = template.request("direct:start", e -> { + e.getMessage().setBody("Hello World"); + }); + + assertMockEndpointsSatisfied(); + + // only the step eips are in the history + List<MessageHistory> history = out.getProperty(Exchange.MESSAGE_HISTORY, List.class); + assertNotNull(history); + assertEquals(3, history.size()); + assertEquals("step", history.get(0).getNode().getShortName()); + assertEquals("a", history.get(0).getNode().getId()); + assertEquals("step", history.get(1).getNode().getShortName()); + assertEquals("b", history.get(1).getNode().getId()); + assertEquals("step", history.get(2).getNode().getShortName()); + assertEquals("bar", history.get(2).getNode().getId()); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + context.setMessageHistory(true); + context.getMessageHistoryFactory().setNodePattern("step"); + + from("direct:start") + .step("a") + .to("log:foo") + .to("mock:a") + .end() + .step("b") + .to("direct:bar") + .to("mock:b") + .end(); + + from("direct:bar") + .step("bar") + .to("log:bar") + .to("mock:bar") + .end(); + } + }; + } +} diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultMessageHistory.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultMessageHistory.java index 2abbe3f..e6ac8d7 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultMessageHistory.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultMessageHistory.java @@ -16,6 +16,7 @@ */ package org.apache.camel.support; +import org.apache.camel.Message; import org.apache.camel.MessageHistory; import org.apache.camel.NamedNode; @@ -28,19 +29,27 @@ public class DefaultMessageHistory implements MessageHistory { private final NamedNode node; private final String nodeId; private final long timestamp; + private final Message message; private long elapsed; public DefaultMessageHistory(String routeId, NamedNode node, long timestamp) { + this(routeId, node, timestamp, null); + } + + public DefaultMessageHistory(String routeId, NamedNode node, long timestamp, Message message) { this.routeId = routeId; this.node = node; this.nodeId = node.getId(); this.timestamp = timestamp; + this.message = message; } + @Override public String getRouteId() { return routeId; } + @Override public NamedNode getNode() { return node; } @@ -50,10 +59,12 @@ public class DefaultMessageHistory implements MessageHistory { return timestamp; } + @Override public long getElapsed() { return elapsed; } + @Override public void nodeProcessingDone() { if (timestamp > 0) { elapsed = System.currentTimeMillis() - timestamp; @@ -61,6 +72,11 @@ public class DefaultMessageHistory implements MessageHistory { } @Override + public Message getMessage() { + return message; + } + + @Override public String toString() { return "DefaultMessageHistory[" + "routeId=" + routeId diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultMessageHistoryFactory.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultMessageHistoryFactory.java similarity index 51% rename from core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultMessageHistoryFactory.java rename to core/camel-support/src/main/java/org/apache/camel/support/DefaultMessageHistoryFactory.java index 2125e5b..9864f2a 100644 --- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultMessageHistoryFactory.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultMessageHistoryFactory.java @@ -14,17 +14,57 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.camel.impl.engine; +package org.apache.camel.support; +import org.apache.camel.Exchange; +import org.apache.camel.Message; import org.apache.camel.MessageHistory; import org.apache.camel.NamedNode; import org.apache.camel.spi.MessageHistoryFactory; -import org.apache.camel.support.DefaultMessageHistory; public class DefaultMessageHistoryFactory implements MessageHistoryFactory { + private boolean copyMessage; + private String nodePattern; + + @Override + public MessageHistory newMessageHistory(String routeId, NamedNode node, long timestamp, Exchange exchange) { + if (nodePattern != null) { + String name = node.getShortName(); + String[] parts = nodePattern.split(","); + for (String part : parts) { + boolean match = PatternHelper.matchPattern(name, part); + if (!match) { + return null; + } + } + } + + Message target = null; + if (copyMessage) { + target = exchange.getMessage().copy(); + } + + return new DefaultMessageHistory(routeId, node, timestamp, target); + } + + @Override + public boolean isCopyMessage() { + return copyMessage; + } + + @Override + public void setCopyMessage(boolean copyMessage) { + this.copyMessage = copyMessage; + } + + @Override + public String getNodePattern() { + return nodePattern; + } + @Override - public MessageHistory newMessageHistory(String routeId, NamedNode node, long timestamp) { - return new DefaultMessageHistory(routeId, node, timestamp); + public void setNodePattern(String nodePattern) { + this.nodePattern = nodePattern; } }