CAMEL-9573: useOriginalMessage should deal with parent/sub UoW.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/155fae5b Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/155fae5b Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/155fae5b Branch: refs/heads/camel-2.16.x Commit: 155fae5b7bd08ec7d1492f2d4c4a1b9e8a0476bf Parents: 355adb6 Author: Claus Ibsen <davscl...@apache.org> Authored: Sun Feb 7 14:23:27 2016 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sun Feb 7 18:18:58 2016 +0100 ---------------------------------------------------------------------- .../apache/camel/model/ProcessorDefinition.java | 11 +++++++-- .../camel/processor/MulticastProcessor.java | 2 +- .../camel/processor/OnCompletionProcessor.java | 2 +- .../camel/processor/RedeliveryErrorHandler.java | 2 +- .../org/apache/camel/util/ExchangeHelper.java | 26 ++++++++++++++++++++ 5 files changed, 38 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/155fae5b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java index 7bb512f..c525881 100644 --- a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java @@ -54,6 +54,7 @@ import org.apache.camel.model.language.ExpressionDefinition; import org.apache.camel.model.language.LanguageExpression; import org.apache.camel.model.language.SimpleExpression; import org.apache.camel.model.rest.RestDefinition; +import org.apache.camel.processor.CamelInternalProcessor; import org.apache.camel.processor.InterceptEndpointProcessor; import org.apache.camel.processor.Pipeline; import org.apache.camel.processor.aggregate.AggregationStrategy; @@ -534,10 +535,16 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> processor = createProcessor(routeContext); } + // unwrap internal processor so we can set id on the actual processor + Processor idProcessor = processor; + if (processor instanceof CamelInternalProcessor) { + idProcessor = ((CamelInternalProcessor) processor).getProcessor(); + } + // inject id - if (processor instanceof IdAware) { + if (idProcessor instanceof IdAware) { String id = this.idOrCreate(routeContext.getCamelContext().getNodeIdFactory()); - ((IdAware) processor).setId(id); + ((IdAware) idProcessor).setId(id); } if (processor == null) { http://git-wip-us.apache.org/repos/asf/camel/blob/155fae5b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java index acc35b9..65a4d51 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java @@ -874,7 +874,7 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor // must copy results at this point if (subExchange != null) { if (stoppedOnException) { - // if we stopped due an exception then only propagte the exception + // if we stopped due an exception then only propagate the exception original.setException(subExchange.getException()); } else { // copy the current result to original so it will contain this result of this eip http://git-wip-us.apache.org/repos/asf/camel/blob/155fae5b/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java index bd06382..f0835b9 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java @@ -203,7 +203,7 @@ public class OnCompletionProcessor extends ServiceSupport implements AsyncProces if (useOriginalBody) { LOG.trace("Using the original IN message instead of current"); - Message original = exchange.getUnitOfWork().getOriginalInMessage(); + Message original = ExchangeHelper.getOriginalInMessage(exchange); answer.setIn(original); } http://git-wip-us.apache.org/repos/asf/camel/blob/155fae5b/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java b/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java index ff94da7..9697dc6 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java +++ b/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java @@ -882,7 +882,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme // prepare original IN body if it should be moved instead of current body if (data.useOriginalInMessage) { log.trace("Using the original IN message instead of current"); - Message original = exchange.getUnitOfWork().getOriginalInMessage(); + Message original = ExchangeHelper.getOriginalInMessage(exchange); exchange.setIn(original); if (exchange.hasOut()) { log.trace("Removing the out message to avoid some uncertain behavior"); http://git-wip-us.apache.org/repos/asf/camel/blob/155fae5b/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java b/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java index 63d8013..d14ab6f 100644 --- a/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java +++ b/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java @@ -864,6 +864,32 @@ public final class ExchangeHelper { } } + /** + * Gets the original IN {@link Message} this Unit of Work was started with. + * <p/> + * The original message is only returned if the option {@link org.apache.camel.RuntimeConfiguration#isAllowUseOriginalMessage()} + * is enabled. If its disabled, then <tt>null</tt> is returned. + * + * @return the original IN {@link Message}, or <tt>null</tt> if using original message is disabled. + */ + public static Message getOriginalInMessage(Exchange exchange) { + Message answer = null; + + // try parent first + UnitOfWork uow = exchange.getProperty(Exchange.PARENT_UNIT_OF_WORK, UnitOfWork.class); + if (uow != null) { + answer = uow.getOriginalInMessage(); + } + // fallback to the current exchange + if (answer == null) { + uow = exchange.getUnitOfWork(); + if (uow != null) { + answer = uow.getOriginalInMessage(); + } + } + return answer; + } + @SuppressWarnings("unchecked") private static Map<String, Object> safeCopy(Map<String, Object> properties) { if (properties == null) {