Repository: camel Updated Branches: refs/heads/master 614a87add -> 0c9693980
CAMEL-9389: Introduce a MessageHistoryFactory Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/18915b42 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/18915b42 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/18915b42 Branch: refs/heads/master Commit: 18915b42a5faeae3af8517056e100481826061bf Parents: 614a87a Author: Claus Ibsen <davscl...@apache.org> Authored: Fri Dec 4 12:23:39 2015 +0000 Committer: Claus Ibsen <davscl...@apache.org> Committed: Fri Dec 4 12:23:39 2015 +0000 ---------------------------------------------------------------------- .../java/org/apache/camel/CamelContext.java | 15 ++++++++ .../apache/camel/impl/DefaultCamelContext.java | 10 ++++++ .../impl/DefaultMessageHistoryFactory.java | 31 ++++++++++++++++ .../camel/processor/CamelInternalProcessor.java | 8 +++-- .../processor/interceptor/DefaultChannel.java | 5 ++- .../apache/camel/spi/MessageHistoryFactory.java | 38 ++++++++++++++++++++ 6 files changed, 103 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/18915b42/camel-core/src/main/java/org/apache/camel/CamelContext.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/CamelContext.java b/camel-core/src/main/java/org/apache/camel/CamelContext.java index 7c2840f..501f6da 100644 --- a/camel-core/src/main/java/org/apache/camel/CamelContext.java +++ b/camel-core/src/main/java/org/apache/camel/CamelContext.java @@ -53,6 +53,7 @@ import org.apache.camel.spi.LifecycleStrategy; import org.apache.camel.spi.ManagementMBeanAssembler; import org.apache.camel.spi.ManagementNameStrategy; import org.apache.camel.spi.ManagementStrategy; +import org.apache.camel.spi.MessageHistoryFactory; import org.apache.camel.spi.ModelJAXBContextFactory; import org.apache.camel.spi.NodeIdFactory; import org.apache.camel.spi.PackageScanClassResolver; @@ -1379,6 +1380,20 @@ public interface CamelContext extends SuspendableService, RuntimeConfiguration { void setProcessorFactory(ProcessorFactory processorFactory); /** + * Gets the current {@link org.apache.camel.spi.MessageHistoryFactory} + * + * @return the factory + */ + public MessageHistoryFactory getMessageHistoryFactory(); + + /** + * Sets a custom {@link org.apache.camel.spi.MessageHistoryFactory} + * + * @param messageHistoryFactory the custom factory + */ + public void setMessageHistoryFactory(MessageHistoryFactory messageHistoryFactory); + + /** * Gets the current {@link Debugger} * * @return the debugger http://git-wip-us.apache.org/repos/asf/camel/blob/18915b42/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java index f7b3033..b43193a 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java +++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java @@ -130,6 +130,7 @@ import org.apache.camel.spi.LifecycleStrategy; import org.apache.camel.spi.ManagementMBeanAssembler; import org.apache.camel.spi.ManagementNameStrategy; import org.apache.camel.spi.ManagementStrategy; +import org.apache.camel.spi.MessageHistoryFactory; import org.apache.camel.spi.ModelJAXBContextFactory; import org.apache.camel.spi.NodeIdFactory; import org.apache.camel.spi.PackageScanClassResolver; @@ -243,6 +244,7 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon private ServicePool<Endpoint, PollingConsumer> pollingConsumerServicePool = new SharedPollingConsumerServicePool(100); private NodeIdFactory nodeIdFactory = new DefaultNodeIdFactory(); private ProcessorFactory processorFactory; + private MessageHistoryFactory messageHistoryFactory = new DefaultMessageHistoryFactory(); private InterceptStrategy defaultTracer; private InterceptStrategy defaultBacklogTracer; private InterceptStrategy defaultBacklogDebugger; @@ -4012,6 +4014,14 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon this.processorFactory = processorFactory; } + public MessageHistoryFactory getMessageHistoryFactory() { + return messageHistoryFactory; + } + + public void setMessageHistoryFactory(MessageHistoryFactory messageHistoryFactory) { + this.messageHistoryFactory = messageHistoryFactory; + } + public Debugger getDebugger() { return debugger; } http://git-wip-us.apache.org/repos/asf/camel/blob/18915b42/camel-core/src/main/java/org/apache/camel/impl/DefaultMessageHistoryFactory.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultMessageHistoryFactory.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultMessageHistoryFactory.java new file mode 100644 index 0000000..42ffeec --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultMessageHistoryFactory.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl; + +import java.util.Date; + +import org.apache.camel.MessageHistory; +import org.apache.camel.NamedNode; +import org.apache.camel.spi.MessageHistoryFactory; + +public class DefaultMessageHistoryFactory implements MessageHistoryFactory { + + @Override + public MessageHistory newMessageHistory(String routeId, NamedNode node, Date timestamp) { + return new DefaultMessageHistory(routeId, node, timestamp); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/18915b42/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java index 856a225..32f8b38 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java @@ -30,7 +30,6 @@ import org.apache.camel.Route; import org.apache.camel.StatefulService; import org.apache.camel.StreamCache; import org.apache.camel.api.management.PerformanceCounter; -import org.apache.camel.impl.DefaultMessageHistory; import org.apache.camel.management.DelegatePerformanceCounter; import org.apache.camel.management.mbean.ManagedPerformanceCounter; import org.apache.camel.model.ProcessorDefinition; @@ -39,6 +38,7 @@ import org.apache.camel.processor.interceptor.BacklogDebugger; import org.apache.camel.processor.interceptor.BacklogTracer; import org.apache.camel.processor.interceptor.DefaultBacklogTracerEventMessage; import org.apache.camel.spi.InflightRepository; +import org.apache.camel.spi.MessageHistoryFactory; import org.apache.camel.spi.RouteContext; import org.apache.camel.spi.RoutePolicy; import org.apache.camel.spi.StreamCachingStrategy; @@ -710,10 +710,12 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor { @SuppressWarnings("unchecked") public static class MessageHistoryAdvice implements CamelInternalProcessorAdvice<MessageHistory> { + private final MessageHistoryFactory factory; private final ProcessorDefinition<?> definition; private final String routeId; - public MessageHistoryAdvice(ProcessorDefinition<?> definition) { + public MessageHistoryAdvice(MessageHistoryFactory factory, ProcessorDefinition<?> definition) { + this.factory = factory; this.definition = definition; this.routeId = ProcessorDefinitionHelper.getRouteId(definition); } @@ -725,7 +727,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor { list = new ArrayList<MessageHistory>(); exchange.setProperty(Exchange.MESSAGE_HISTORY, list); } - MessageHistory history = new DefaultMessageHistory(routeId, definition, new Date()); + MessageHistory history = factory.newMessageHistory(routeId, definition, new Date()); list.add(history); return history; } http://git-wip-us.apache.org/repos/asf/camel/blob/18915b42/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java b/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java index 56d0967..1afeb45 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java +++ b/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java @@ -27,6 +27,7 @@ import org.apache.camel.CamelContextAware; import org.apache.camel.Channel; import org.apache.camel.Exchange; import org.apache.camel.Processor; +import org.apache.camel.impl.DefaultMessageHistoryFactory; import org.apache.camel.model.ModelChannel; import org.apache.camel.model.OnCompletionDefinition; import org.apache.camel.model.OnExceptionDefinition; @@ -38,6 +39,7 @@ import org.apache.camel.processor.CamelInternalProcessor; import org.apache.camel.processor.InterceptorToAsyncProcessorBridge; import org.apache.camel.processor.WrapProcessor; import org.apache.camel.spi.InterceptStrategy; +import org.apache.camel.spi.MessageHistoryFactory; import org.apache.camel.spi.RouteContext; import org.apache.camel.util.OrderedComparator; import org.apache.camel.util.ServiceHelper; @@ -242,7 +244,8 @@ public class DefaultChannel extends CamelInternalProcessor implements ModelChann if (routeContext.isMessageHistory()) { // add message history advice - addAdvice(new MessageHistoryAdvice(targetOutputDef)); + MessageHistoryFactory factory = camelContext.getMessageHistoryFactory(); + addAdvice(new MessageHistoryAdvice(factory, targetOutputDef)); } // the regular tracer is not a task on internalProcessor as this is not really needed http://git-wip-us.apache.org/repos/asf/camel/blob/18915b42/camel-core/src/main/java/org/apache/camel/spi/MessageHistoryFactory.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/spi/MessageHistoryFactory.java b/camel-core/src/main/java/org/apache/camel/spi/MessageHistoryFactory.java new file mode 100644 index 0000000..e8020c8 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/spi/MessageHistoryFactory.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.spi; + +import java.util.Date; + +import org.apache.camel.MessageHistory; +import org.apache.camel.NamedNode; + +/** + * A factory to create {@link MessageHistory} instances. + */ +public interface MessageHistoryFactory { + + /** + * Creates a new {@link MessageHistory} + * + * @param routeId the route id + * @param node the node in the route + * @param timestamp the time the message processed at this node. + * @return a new {@link MessageHistory} + */ + MessageHistory newMessageHistory(String routeId, NamedNode node, Date timestamp); +}