Repository: camel Updated Branches: refs/heads/camel-2.16.x 355adb6b9 -> 0ea3ce9a1 refs/heads/master 848315144 -> bee0d3b04
CAMEL-9573: useOriginalMessage should deal with parent/sub UoW. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/c710a2b4 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/c710a2b4 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/c710a2b4 Branch: refs/heads/master Commit: c710a2b43b7732d589bb2f0c12a03c7e8c513d93 Parents: 8483151 Author: Claus Ibsen <davscl...@apache.org> Authored: Sun Feb 7 14:23:27 2016 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sun Feb 7 14:23:27 2016 +0100 ---------------------------------------------------------------------- .../apache/camel/model/ProcessorDefinition.java | 11 +++++++-- .../camel/processor/MulticastProcessor.java | 2 +- .../camel/processor/OnCompletionProcessor.java | 2 +- .../camel/processor/RedeliveryErrorHandler.java | 2 +- .../org/apache/camel/util/ExchangeHelper.java | 26 ++++++++++++++++++++ 5 files changed, 38 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/c710a2b4/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java index eacb304..0705d69 100644 --- a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java @@ -54,6 +54,7 @@ import org.apache.camel.model.language.ExpressionDefinition; import org.apache.camel.model.language.LanguageExpression; import org.apache.camel.model.language.SimpleExpression; import org.apache.camel.model.rest.RestDefinition; +import org.apache.camel.processor.CamelInternalProcessor; import org.apache.camel.processor.InterceptEndpointProcessor; import org.apache.camel.processor.Pipeline; import org.apache.camel.processor.aggregate.AggregationStrategy; @@ -534,10 +535,16 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> processor = createProcessor(routeContext); } + // unwrap internal processor so we can set id on the actual processor + Processor idProcessor = processor; + if (processor instanceof CamelInternalProcessor) { + idProcessor = ((CamelInternalProcessor) processor).getProcessor(); + } + // inject id - if (processor instanceof IdAware) { + if (idProcessor instanceof IdAware) { String id = this.idOrCreate(routeContext.getCamelContext().getNodeIdFactory()); - ((IdAware) processor).setId(id); + ((IdAware) idProcessor).setId(id); } if (processor == null) { http://git-wip-us.apache.org/repos/asf/camel/blob/c710a2b4/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java index acc35b9..65a4d51 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java @@ -874,7 +874,7 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor // must copy results at this point if (subExchange != null) { if (stoppedOnException) { - // if we stopped due an exception then only propagte the exception + // if we stopped due an exception then only propagate the exception original.setException(subExchange.getException()); } else { // copy the current result to original so it will contain this result of this eip http://git-wip-us.apache.org/repos/asf/camel/blob/c710a2b4/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java index bd06382..f0835b9 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java @@ -203,7 +203,7 @@ public class OnCompletionProcessor extends ServiceSupport implements AsyncProces if (useOriginalBody) { LOG.trace("Using the original IN message instead of current"); - Message original = exchange.getUnitOfWork().getOriginalInMessage(); + Message original = ExchangeHelper.getOriginalInMessage(exchange); answer.setIn(original); } http://git-wip-us.apache.org/repos/asf/camel/blob/c710a2b4/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java b/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java index 5e48c3e..bdc9346 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java +++ b/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java @@ -918,7 +918,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme // prepare original IN body if it should be moved instead of current body if (data.useOriginalInMessage) { log.trace("Using the original IN message instead of current"); - Message original = exchange.getUnitOfWork().getOriginalInMessage(); + Message original = ExchangeHelper.getOriginalInMessage(exchange); exchange.setIn(original); if (exchange.hasOut()) { log.trace("Removing the out message to avoid some uncertain behavior"); http://git-wip-us.apache.org/repos/asf/camel/blob/c710a2b4/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java b/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java index 505f04e..9641026 100644 --- a/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java +++ b/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java @@ -878,6 +878,32 @@ public final class ExchangeHelper { } } + /** + * Gets the original IN {@link Message} this Unit of Work was started with. + * <p/> + * The original message is only returned if the option {@link org.apache.camel.RuntimeConfiguration#isAllowUseOriginalMessage()} + * is enabled. If its disabled, then <tt>null</tt> is returned. + * + * @return the original IN {@link Message}, or <tt>null</tt> if using original message is disabled. + */ + public static Message getOriginalInMessage(Exchange exchange) { + Message answer = null; + + // try parent first + UnitOfWork uow = exchange.getProperty(Exchange.PARENT_UNIT_OF_WORK, UnitOfWork.class); + if (uow != null) { + answer = uow.getOriginalInMessage(); + } + // fallback to the current exchange + if (answer == null) { + uow = exchange.getUnitOfWork(); + if (uow != null) { + answer = uow.getOriginalInMessage(); + } + } + return answer; + } + @SuppressWarnings("unchecked") private static Map<String, Object> safeCopy(Map<String, Object> properties) { if (properties == null) {