Repository: camel
Updated Branches:
  refs/heads/master 614a87add -> 0c9693980


CAMEL-9389: Introduce a MessageHistoryFactory


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/18915b42
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/18915b42
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/18915b42

Branch: refs/heads/master
Commit: 18915b42a5faeae3af8517056e100481826061bf
Parents: 614a87a
Author: Claus Ibsen <davscl...@apache.org>
Authored: Fri Dec 4 12:23:39 2015 +0000
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Fri Dec 4 12:23:39 2015 +0000

----------------------------------------------------------------------
 .../java/org/apache/camel/CamelContext.java     | 15 ++++++++
 .../apache/camel/impl/DefaultCamelContext.java  | 10 ++++++
 .../impl/DefaultMessageHistoryFactory.java      | 31 ++++++++++++++++
 .../camel/processor/CamelInternalProcessor.java |  8 +++--
 .../processor/interceptor/DefaultChannel.java   |  5 ++-
 .../apache/camel/spi/MessageHistoryFactory.java | 38 ++++++++++++++++++++
 6 files changed, 103 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/18915b42/camel-core/src/main/java/org/apache/camel/CamelContext.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/CamelContext.java 
b/camel-core/src/main/java/org/apache/camel/CamelContext.java
index 7c2840f..501f6da 100644
--- a/camel-core/src/main/java/org/apache/camel/CamelContext.java
+++ b/camel-core/src/main/java/org/apache/camel/CamelContext.java
@@ -53,6 +53,7 @@ import org.apache.camel.spi.LifecycleStrategy;
 import org.apache.camel.spi.ManagementMBeanAssembler;
 import org.apache.camel.spi.ManagementNameStrategy;
 import org.apache.camel.spi.ManagementStrategy;
+import org.apache.camel.spi.MessageHistoryFactory;
 import org.apache.camel.spi.ModelJAXBContextFactory;
 import org.apache.camel.spi.NodeIdFactory;
 import org.apache.camel.spi.PackageScanClassResolver;
@@ -1379,6 +1380,20 @@ public interface CamelContext extends 
SuspendableService, RuntimeConfiguration {
     void setProcessorFactory(ProcessorFactory processorFactory);
 
     /**
+     * Gets the current {@link org.apache.camel.spi.MessageHistoryFactory}
+     *
+     * @return the factory
+     */
+    public MessageHistoryFactory getMessageHistoryFactory();
+
+    /**
+     * Sets a custom {@link org.apache.camel.spi.MessageHistoryFactory}
+     *
+     * @param messageHistoryFactory the custom factory
+     */
+    public void setMessageHistoryFactory(MessageHistoryFactory 
messageHistoryFactory);
+
+    /**
      * Gets the current {@link Debugger}
      *
      * @return the debugger

http://git-wip-us.apache.org/repos/asf/camel/blob/18915b42/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java 
b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
index f7b3033..b43193a 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
@@ -130,6 +130,7 @@ import org.apache.camel.spi.LifecycleStrategy;
 import org.apache.camel.spi.ManagementMBeanAssembler;
 import org.apache.camel.spi.ManagementNameStrategy;
 import org.apache.camel.spi.ManagementStrategy;
+import org.apache.camel.spi.MessageHistoryFactory;
 import org.apache.camel.spi.ModelJAXBContextFactory;
 import org.apache.camel.spi.NodeIdFactory;
 import org.apache.camel.spi.PackageScanClassResolver;
@@ -243,6 +244,7 @@ public class DefaultCamelContext extends ServiceSupport 
implements ModelCamelCon
     private ServicePool<Endpoint, PollingConsumer> pollingConsumerServicePool 
= new SharedPollingConsumerServicePool(100);
     private NodeIdFactory nodeIdFactory = new DefaultNodeIdFactory();
     private ProcessorFactory processorFactory;
+    private MessageHistoryFactory messageHistoryFactory = new 
DefaultMessageHistoryFactory();
     private InterceptStrategy defaultTracer;
     private InterceptStrategy defaultBacklogTracer;
     private InterceptStrategy defaultBacklogDebugger;
@@ -4012,6 +4014,14 @@ public class DefaultCamelContext extends ServiceSupport 
implements ModelCamelCon
         this.processorFactory = processorFactory;
     }
 
+    public MessageHistoryFactory getMessageHistoryFactory() {
+        return messageHistoryFactory;
+    }
+
+    public void setMessageHistoryFactory(MessageHistoryFactory 
messageHistoryFactory) {
+        this.messageHistoryFactory = messageHistoryFactory;
+    }
+
     public Debugger getDebugger() {
         return debugger;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/18915b42/camel-core/src/main/java/org/apache/camel/impl/DefaultMessageHistoryFactory.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/impl/DefaultMessageHistoryFactory.java
 
b/camel-core/src/main/java/org/apache/camel/impl/DefaultMessageHistoryFactory.java
new file mode 100644
index 0000000..42ffeec
--- /dev/null
+++ 
b/camel-core/src/main/java/org/apache/camel/impl/DefaultMessageHistoryFactory.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import java.util.Date;
+
+import org.apache.camel.MessageHistory;
+import org.apache.camel.NamedNode;
+import org.apache.camel.spi.MessageHistoryFactory;
+
+public class DefaultMessageHistoryFactory implements MessageHistoryFactory {
+
+    @Override
+    public MessageHistory newMessageHistory(String routeId, NamedNode node, 
Date timestamp) {
+        return new DefaultMessageHistory(routeId, node, timestamp);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/18915b42/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
 
b/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
index 856a225..32f8b38 100644
--- 
a/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
+++ 
b/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
@@ -30,7 +30,6 @@ import org.apache.camel.Route;
 import org.apache.camel.StatefulService;
 import org.apache.camel.StreamCache;
 import org.apache.camel.api.management.PerformanceCounter;
-import org.apache.camel.impl.DefaultMessageHistory;
 import org.apache.camel.management.DelegatePerformanceCounter;
 import org.apache.camel.management.mbean.ManagedPerformanceCounter;
 import org.apache.camel.model.ProcessorDefinition;
@@ -39,6 +38,7 @@ import org.apache.camel.processor.interceptor.BacklogDebugger;
 import org.apache.camel.processor.interceptor.BacklogTracer;
 import org.apache.camel.processor.interceptor.DefaultBacklogTracerEventMessage;
 import org.apache.camel.spi.InflightRepository;
+import org.apache.camel.spi.MessageHistoryFactory;
 import org.apache.camel.spi.RouteContext;
 import org.apache.camel.spi.RoutePolicy;
 import org.apache.camel.spi.StreamCachingStrategy;
@@ -710,10 +710,12 @@ public class CamelInternalProcessor extends 
DelegateAsyncProcessor {
     @SuppressWarnings("unchecked")
     public static class MessageHistoryAdvice implements 
CamelInternalProcessorAdvice<MessageHistory> {
 
+        private final MessageHistoryFactory factory;
         private final ProcessorDefinition<?> definition;
         private final String routeId;
 
-        public MessageHistoryAdvice(ProcessorDefinition<?> definition) {
+        public MessageHistoryAdvice(MessageHistoryFactory factory, 
ProcessorDefinition<?> definition) {
+            this.factory = factory;
             this.definition = definition;
             this.routeId = ProcessorDefinitionHelper.getRouteId(definition);
         }
@@ -725,7 +727,7 @@ public class CamelInternalProcessor extends 
DelegateAsyncProcessor {
                 list = new ArrayList<MessageHistory>();
                 exchange.setProperty(Exchange.MESSAGE_HISTORY, list);
             }
-            MessageHistory history = new DefaultMessageHistory(routeId, 
definition, new Date());
+            MessageHistory history = factory.newMessageHistory(routeId, 
definition, new Date());
             list.add(history);
             return history;
         }

http://git-wip-us.apache.org/repos/asf/camel/blob/18915b42/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java
 
b/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java
index 56d0967..1afeb45 100644
--- 
a/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java
+++ 
b/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java
@@ -27,6 +27,7 @@ import org.apache.camel.CamelContextAware;
 import org.apache.camel.Channel;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
+import org.apache.camel.impl.DefaultMessageHistoryFactory;
 import org.apache.camel.model.ModelChannel;
 import org.apache.camel.model.OnCompletionDefinition;
 import org.apache.camel.model.OnExceptionDefinition;
@@ -38,6 +39,7 @@ import org.apache.camel.processor.CamelInternalProcessor;
 import org.apache.camel.processor.InterceptorToAsyncProcessorBridge;
 import org.apache.camel.processor.WrapProcessor;
 import org.apache.camel.spi.InterceptStrategy;
+import org.apache.camel.spi.MessageHistoryFactory;
 import org.apache.camel.spi.RouteContext;
 import org.apache.camel.util.OrderedComparator;
 import org.apache.camel.util.ServiceHelper;
@@ -242,7 +244,8 @@ public class DefaultChannel extends CamelInternalProcessor 
implements ModelChann
 
         if (routeContext.isMessageHistory()) {
             // add message history advice
-            addAdvice(new MessageHistoryAdvice(targetOutputDef));
+            MessageHistoryFactory factory = 
camelContext.getMessageHistoryFactory();
+            addAdvice(new MessageHistoryAdvice(factory, targetOutputDef));
         }
 
         // the regular tracer is not a task on internalProcessor as this is 
not really needed

http://git-wip-us.apache.org/repos/asf/camel/blob/18915b42/camel-core/src/main/java/org/apache/camel/spi/MessageHistoryFactory.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/spi/MessageHistoryFactory.java 
b/camel-core/src/main/java/org/apache/camel/spi/MessageHistoryFactory.java
new file mode 100644
index 0000000..e8020c8
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/spi/MessageHistoryFactory.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spi;
+
+import java.util.Date;
+
+import org.apache.camel.MessageHistory;
+import org.apache.camel.NamedNode;
+
+/**
+ * A factory to create {@link MessageHistory} instances.
+ */
+public interface MessageHistoryFactory {
+
+    /**
+     * Creates a new {@link MessageHistory}
+     *
+     * @param routeId   the route id
+     * @param node      the node in the route
+     * @param timestamp the time the message processed at this node.
+     * @return a new {@link MessageHistory}
+     */
+    MessageHistory newMessageHistory(String routeId, NamedNode node, Date 
timestamp);
+}

Reply via email to