This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 7d79ee922d0886df036703e5c62e6940de7b8a54
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Tue Jun 11 18:54:02 2019 +0200

    CAMEL-13369: Message History EIP now supports filtering nodes and to keep a 
copy of the traced message.
---
 .../main/java/org/apache/camel/MessageHistory.java |  5 ++
 .../apache/camel/spi/MessageHistoryFactory.java    | 23 +++++-
 .../camel/processor/CamelInternalProcessor.java    |  6 +-
 .../org/apache/camel/impl/DefaultCamelContext.java |  2 +-
 .../DefaultAsyncProcessorAwaitManagerTest.java     |  6 +-
 .../processor/MessageHistoryCopyMessageTest.java   | 84 ++++++++++++++++++++++
 .../processor/MessageHistoryStepOnlyTest.java      | 80 +++++++++++++++++++++
 .../camel/support/DefaultMessageHistory.java       | 16 +++++
 .../support}/DefaultMessageHistoryFactory.java     | 48 +++++++++++--
 9 files changed, 259 insertions(+), 11 deletions(-)

diff --git a/core/camel-api/src/main/java/org/apache/camel/MessageHistory.java 
b/core/camel-api/src/main/java/org/apache/camel/MessageHistory.java
index 93b4fb2..c4545ef 100644
--- a/core/camel-api/src/main/java/org/apache/camel/MessageHistory.java
+++ b/core/camel-api/src/main/java/org/apache/camel/MessageHistory.java
@@ -47,4 +47,9 @@ public interface MessageHistory {
      */
     void nodeProcessingDone();
 
+    /**
+     * A read-only copy of the message at the point of this history (if this 
has been enabled).
+     */
+    Message getMessage();
+
 }
diff --git 
a/core/camel-api/src/main/java/org/apache/camel/spi/MessageHistoryFactory.java 
b/core/camel-api/src/main/java/org/apache/camel/spi/MessageHistoryFactory.java
index e7f37aa..3c3c9b6 100644
--- 
a/core/camel-api/src/main/java/org/apache/camel/spi/MessageHistoryFactory.java
+++ 
b/core/camel-api/src/main/java/org/apache/camel/spi/MessageHistoryFactory.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.spi;
 
+import org.apache.camel.Exchange;
 import org.apache.camel.MessageHistory;
 import org.apache.camel.NamedNode;
 
@@ -30,7 +31,27 @@ public interface MessageHistoryFactory {
      * @param routeId   the route id
      * @param node      the node in the route
      * @param timestamp the time the message processed at this node.
+     * @param exchange  the current exchange
      * @return a new {@link MessageHistory}
      */
-    MessageHistory newMessageHistory(String routeId, NamedNode node, long 
timestamp);
+    MessageHistory newMessageHistory(String routeId, NamedNode node, long 
timestamp, Exchange exchange);
+
+    boolean isCopyMessage();
+
+    /**
+     * Sets whether to make a copy of the message in the {@link 
MessageHistory}.
+     * By default this is turned off. Beware that you should not mutate or 
change the content
+     * on the copied message, as its purpose is as a read-only view of the 
message.
+     */
+    void setCopyMessage(boolean copyMessage);
+
+    String getNodePattern();
+
+    /**
+     * An optional pattern to filter which nodes to trace in this message 
history. By default all nodes are included.
+     * To only include nodes that are Step EIPs then use the EIP shortname, eg 
step.
+     * You can also include multiple nodes separated by comma, eg 
step,wiretap,to
+     */
+    void setNodePattern(String nodePattern);
+
 }
diff --git 
a/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
 
b/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
index ba7fd2e..27faa0b 100644
--- 
a/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
+++ 
b/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
@@ -642,8 +642,10 @@ public class CamelInternalProcessor extends 
DelegateAsyncProcessor {
                 }
             }
 
-            MessageHistory history = factory.newMessageHistory(targetRouteId, 
definition, System.currentTimeMillis());
-            list.add(history);
+            MessageHistory history = factory.newMessageHistory(targetRouteId, 
definition, System.currentTimeMillis(), exchange);
+            if (history != null) {
+                list.add(history);
+            }
             return history;
         }
 
diff --git 
a/core/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java 
b/core/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
index 6b7ac7e..d47a107 100644
--- 
a/core/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
+++ 
b/core/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
@@ -41,7 +41,7 @@ import org.apache.camel.impl.engine.DefaultInflightRepository;
 import org.apache.camel.impl.engine.DefaultInjector;
 import org.apache.camel.impl.engine.DefaultLanguageResolver;
 import org.apache.camel.impl.engine.DefaultManagementNameStrategy;
-import org.apache.camel.impl.engine.DefaultMessageHistoryFactory;
+import org.apache.camel.support.DefaultMessageHistoryFactory;
 import org.apache.camel.impl.engine.DefaultNodeIdFactory;
 import org.apache.camel.impl.engine.DefaultPackageScanClassResolver;
 import org.apache.camel.impl.engine.DefaultProcessorFactory;
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManagerTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManagerTest.java
index edbb158..8646b82 100644
--- 
a/core/camel-core/src/test/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManagerTest.java
+++ 
b/core/camel-core/src/test/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManagerTest.java
@@ -23,7 +23,7 @@ import org.apache.camel.Exchange;
 import org.apache.camel.MessageHistory;
 import org.apache.camel.NamedNode;
 import org.apache.camel.impl.engine.DefaultAsyncProcessorAwaitManager;
-import org.apache.camel.impl.engine.DefaultMessageHistoryFactory;
+import org.apache.camel.support.DefaultMessageHistoryFactory;
 import org.apache.camel.spi.AsyncProcessorAwaitManager;
 import org.apache.camel.spi.MessageHistoryFactory;
 import org.apache.camel.support.DefaultExchange;
@@ -78,7 +78,7 @@ public class DefaultAsyncProcessorAwaitManagerTest {
         LinkedList<MessageHistory> messageHistories = new LinkedList<>();
         messageHistories.add(MESSAGE_HISTORY_FACTORY.newMessageHistory(null,
                 new MockNamedNode().withId(null),
-                0));
+                0, null));
         exchange.setProperty(Exchange.MESSAGE_HISTORY, messageHistories);
         AsyncProcessorAwaitManager.AwaitThread awaitThread = 
defaultAsyncProcessorAwaitManager.browse().iterator().next();
         assertThat(awaitThread.getRouteId(), is(nullValue()));
@@ -92,7 +92,7 @@ public class DefaultAsyncProcessorAwaitManagerTest {
         LinkedList<MessageHistory> messageHistories = new LinkedList<>();
         
messageHistories.add(MESSAGE_HISTORY_FACTORY.newMessageHistory("routeId",
                 new MockNamedNode().withId("nodeId"),
-                0));
+                0, null));
         exchange.setProperty(Exchange.MESSAGE_HISTORY, messageHistories);
         AsyncProcessorAwaitManager.AwaitThread awaitThread = 
defaultAsyncProcessorAwaitManager.browse().iterator().next();
         assertThat(awaitThread.getRouteId(), is("routeId"));
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/processor/MessageHistoryCopyMessageTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/processor/MessageHistoryCopyMessageTest.java
new file mode 100644
index 0000000..5249a14
--- /dev/null
+++ 
b/core/camel-core/src/test/java/org/apache/camel/processor/MessageHistoryCopyMessageTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.List;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.MessageHistory;
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.Test;
+
+public class MessageHistoryCopyMessageTest extends ContextTestSupport {
+
+    @Test
+    public void testCopyMessage() throws Exception {
+        getMockEndpoint("mock:a").expectedMessageCount(1);
+        getMockEndpoint("mock:b").expectedMessageCount(1);
+        getMockEndpoint("mock:bar").expectedMessageCount(1);
+
+        Exchange out = template.request("direct:start", e -> {
+            e.getMessage().setBody("Hello World");
+        });
+
+        assertMockEndpointsSatisfied();
+
+        // only the step eips are in the history
+        List<MessageHistory> history = 
out.getProperty(Exchange.MESSAGE_HISTORY, List.class);
+        assertNotNull(history);
+        assertEquals(3, history.size());
+        assertEquals("step", history.get(0).getNode().getShortName());
+        assertEquals("a", history.get(0).getNode().getId());
+        assertEquals("Hello World", history.get(0).getMessage().getBody());
+        assertEquals("step", history.get(1).getNode().getShortName());
+        assertEquals("b", history.get(1).getNode().getId());
+        assertEquals("Bye World", history.get(1).getMessage().getBody());
+        assertEquals("step", history.get(2).getNode().getShortName());
+        assertEquals("bar", history.get(2).getNode().getId());
+        assertEquals("Hi World", history.get(2).getMessage().getBody());
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                context.setMessageHistory(true);
+                context.getMessageHistoryFactory().setNodePattern("step");
+                context.getMessageHistoryFactory().setCopyMessage(true);
+
+                from("direct:start")
+                        .step("a")
+                            .transform().constant("Bye World")
+                            .to("mock:a")
+                        .end()
+                        .step("b")
+                            .transform().constant("Hi World")
+                            .to("direct:bar")
+                            .to("mock:b")
+                        .end();
+
+                from("direct:bar")
+                    .step("bar")
+                        .to("log:bar")
+                        .to("mock:bar")
+                    .end();
+            }
+        };
+    }
+}
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/processor/MessageHistoryStepOnlyTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/processor/MessageHistoryStepOnlyTest.java
new file mode 100644
index 0000000..b5a73b9
--- /dev/null
+++ 
b/core/camel-core/src/test/java/org/apache/camel/processor/MessageHistoryStepOnlyTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.List;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.MessageHistory;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
+
+public class MessageHistoryStepOnlyTest extends ContextTestSupport {
+
+    @Test
+    public void testStepOnly() throws Exception {
+        getMockEndpoint("mock:a").expectedMessageCount(1);
+        getMockEndpoint("mock:b").expectedMessageCount(1);
+        getMockEndpoint("mock:bar").expectedMessageCount(1);
+
+        Exchange out = template.request("direct:start", e -> {
+            e.getMessage().setBody("Hello World");
+        });
+
+        assertMockEndpointsSatisfied();
+
+        // only the step eips are in the history
+        List<MessageHistory> history = 
out.getProperty(Exchange.MESSAGE_HISTORY, List.class);
+        assertNotNull(history);
+        assertEquals(3, history.size());
+        assertEquals("step", history.get(0).getNode().getShortName());
+        assertEquals("a", history.get(0).getNode().getId());
+        assertEquals("step", history.get(1).getNode().getShortName());
+        assertEquals("b", history.get(1).getNode().getId());
+        assertEquals("step", history.get(2).getNode().getShortName());
+        assertEquals("bar", history.get(2).getNode().getId());
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                context.setMessageHistory(true);
+                context.getMessageHistoryFactory().setNodePattern("step");
+
+                from("direct:start")
+                        .step("a")
+                            .to("log:foo")
+                            .to("mock:a")
+                        .end()
+                        .step("b")
+                            .to("direct:bar")
+                            .to("mock:b")
+                        .end();
+
+                from("direct:bar")
+                    .step("bar")
+                        .to("log:bar")
+                        .to("mock:bar")
+                    .end();
+            }
+        };
+    }
+}
diff --git 
a/core/camel-support/src/main/java/org/apache/camel/support/DefaultMessageHistory.java
 
b/core/camel-support/src/main/java/org/apache/camel/support/DefaultMessageHistory.java
index 2abbe3f..e6ac8d7 100644
--- 
a/core/camel-support/src/main/java/org/apache/camel/support/DefaultMessageHistory.java
+++ 
b/core/camel-support/src/main/java/org/apache/camel/support/DefaultMessageHistory.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.support;
 
+import org.apache.camel.Message;
 import org.apache.camel.MessageHistory;
 import org.apache.camel.NamedNode;
 
@@ -28,19 +29,27 @@ public class DefaultMessageHistory implements 
MessageHistory {
     private final NamedNode node;
     private final String nodeId;
     private final long timestamp;
+    private final Message message;
     private long elapsed;
 
     public DefaultMessageHistory(String routeId, NamedNode node, long 
timestamp) {
+        this(routeId, node, timestamp, null);
+    }
+
+    public DefaultMessageHistory(String routeId, NamedNode node, long 
timestamp, Message message) {
         this.routeId = routeId;
         this.node = node;
         this.nodeId = node.getId();
         this.timestamp = timestamp;
+        this.message = message;
     }
 
+    @Override
     public String getRouteId() {
         return routeId;
     }
 
+    @Override
     public NamedNode getNode() {
         return node;
     }
@@ -50,10 +59,12 @@ public class DefaultMessageHistory implements 
MessageHistory {
         return timestamp;
     }
 
+    @Override
     public long getElapsed() {
         return elapsed;
     }
 
+    @Override
     public void nodeProcessingDone() {
         if (timestamp > 0) {
             elapsed = System.currentTimeMillis() - timestamp;
@@ -61,6 +72,11 @@ public class DefaultMessageHistory implements MessageHistory 
{
     }
 
     @Override
+    public Message getMessage() {
+        return message;
+    }
+
+    @Override
     public String toString() {
         return "DefaultMessageHistory["
                 + "routeId=" + routeId
diff --git 
a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultMessageHistoryFactory.java
 
b/core/camel-support/src/main/java/org/apache/camel/support/DefaultMessageHistoryFactory.java
similarity index 51%
rename from 
core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultMessageHistoryFactory.java
rename to 
core/camel-support/src/main/java/org/apache/camel/support/DefaultMessageHistoryFactory.java
index 2125e5b..9864f2a 100644
--- 
a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultMessageHistoryFactory.java
+++ 
b/core/camel-support/src/main/java/org/apache/camel/support/DefaultMessageHistoryFactory.java
@@ -14,17 +14,57 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.camel.impl.engine;
+package org.apache.camel.support;
 
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
 import org.apache.camel.MessageHistory;
 import org.apache.camel.NamedNode;
 import org.apache.camel.spi.MessageHistoryFactory;
-import org.apache.camel.support.DefaultMessageHistory;
 
 public class DefaultMessageHistoryFactory implements MessageHistoryFactory {
 
+    private boolean copyMessage;
+    private String nodePattern;
+
+    @Override
+    public MessageHistory newMessageHistory(String routeId, NamedNode node, 
long timestamp, Exchange exchange) {
+        if (nodePattern != null) {
+            String name = node.getShortName();
+            String[] parts = nodePattern.split(",");
+            for (String part : parts) {
+                boolean match = PatternHelper.matchPattern(name, part);
+                if (!match) {
+                    return null;
+                }
+            }
+        }
+
+        Message target = null;
+        if (copyMessage) {
+            target = exchange.getMessage().copy();
+        }
+
+        return new DefaultMessageHistory(routeId, node, timestamp, target);
+    }
+
+    @Override
+    public boolean isCopyMessage() {
+        return copyMessage;
+    }
+
+    @Override
+    public void setCopyMessage(boolean copyMessage) {
+        this.copyMessage = copyMessage;
+    }
+
+    @Override
+    public String getNodePattern() {
+        return nodePattern;
+    }
+
     @Override
-    public MessageHistory newMessageHistory(String routeId, NamedNode node, 
long timestamp) {
-        return new DefaultMessageHistory(routeId, node, timestamp);
+    public void setNodePattern(String nodePattern) {
+        this.nodePattern = nodePattern;
     }
 }

Reply via email to