Repository: camel Updated Branches: refs/heads/master e3b31428c -> 64920052f
CAMEL-5301: Recipient list supports exchangePattern uri option in the endpoints. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/64920052 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/64920052 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/64920052 Branch: refs/heads/master Commit: 64920052fec03af0c632526b32f5d31e1e805dd9 Parents: 691748e Author: Claus Ibsen <davscl...@apache.org> Authored: Tue Nov 11 13:33:10 2014 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Tue Nov 11 13:37:16 2014 +0100 ---------------------------------------------------------------------- .../camel/processor/RecipientListProcessor.java | 51 ++++++++++++++------ .../CxfRsProducerClientFactoryCache2Test.java | 3 +- 2 files changed, 39 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/64920052/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java index 492b553..1087da6 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java @@ -16,9 +16,13 @@ */ package org.apache.camel.processor; +import java.io.UnsupportedEncodingException; +import java.net.URI; +import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.concurrent.ExecutorService; import org.apache.camel.CamelContext; @@ -27,13 +31,13 @@ import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; import org.apache.camel.Processor; import org.apache.camel.Producer; -import org.apache.camel.impl.DefaultEndpoint; import org.apache.camel.impl.ProducerCache; import org.apache.camel.processor.aggregate.AggregationStrategy; import org.apache.camel.spi.RouteContext; import org.apache.camel.util.ExchangeHelper; import org.apache.camel.util.MessageHelper; import org.apache.camel.util.ServiceHelper; +import org.apache.camel.util.URISupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,17 +76,18 @@ public class RecipientListProcessor extends MulticastProcessor { private Processor prepared; private final Exchange exchange; private final ProducerCache producerCache; - private final ExchangePattern originalExchangePattern; + private final ExchangePattern pattern; + private volatile ExchangePattern originalPattern; private RecipientProcessorExchangePair(int index, ProducerCache producerCache, Endpoint endpoint, Producer producer, - Processor prepared, Exchange exchange) { + Processor prepared, Exchange exchange, ExchangePattern pattern) { this.index = index; this.producerCache = producerCache; this.endpoint = endpoint; this.producer = producer; this.prepared = prepared; this.exchange = exchange; - this.originalExchangePattern = exchange.getPattern(); + this.pattern = pattern; } public int getIndex() { @@ -108,12 +113,10 @@ public class RecipientListProcessor extends MulticastProcessor { // ensure stream caching is reset MessageHelper.resetStreamCache(exchange.getIn()); // if the MEP on the endpoint is different then - if (endpoint instanceof DefaultEndpoint) { - ExchangePattern pattern = ((DefaultEndpoint) endpoint).getExchangePattern(); - if (pattern != null && !pattern.equals(exchange.getPattern())) { - LOG.trace("Using exchangePattern: {} on exchange: {}", pattern, exchange); - exchange.setPattern(pattern); - } + if (pattern != null) { + originalPattern = exchange.getPattern(); + LOG.trace("Using exchangePattern: {} on exchange: {}", pattern, exchange); + exchange.setPattern(pattern); } } @@ -121,7 +124,9 @@ public class RecipientListProcessor extends MulticastProcessor { LOG.trace("RecipientProcessorExchangePair #{} done: {}", index, exchange); try { // preserve original MEP - exchange.setPattern(originalExchangePattern); + if (originalPattern != null) { + exchange.setPattern(originalPattern); + } // when we are done we should release back in pool producerCache.releaseProducer(endpoint, producer); } catch (Exception e) { @@ -183,8 +188,10 @@ public class RecipientListProcessor extends MulticastProcessor { Object recipient = iter.next(); Endpoint endpoint; Producer producer; + ExchangePattern pattern; try { endpoint = resolveEndpoint(exchange, recipient); + pattern = resolveExchangePattern(exchange, recipient); producer = producerCache.acquireProducer(endpoint); } catch (Exception e) { if (isIgnoreInvalidEndpoints()) { @@ -199,7 +206,7 @@ public class RecipientListProcessor extends MulticastProcessor { } // then create the exchange pair - result.add(createProcessorExchangePair(index++, endpoint, producer, exchange)); + result.add(createProcessorExchangePair(index++, endpoint, producer, exchange, pattern)); } return result; @@ -208,7 +215,7 @@ public class RecipientListProcessor extends MulticastProcessor { /** * This logic is similar to MulticastProcessor but we have to return a RecipientProcessorExchangePair instead */ - protected ProcessorExchangePair createProcessorExchangePair(int index, Endpoint endpoint, Producer producer, Exchange exchange) { + protected ProcessorExchangePair createProcessorExchangePair(int index, Endpoint endpoint, Producer producer, Exchange exchange, ExchangePattern pattern) { Processor prepared = producer; // copy exchange, and do not share the unit of work @@ -236,7 +243,7 @@ public class RecipientListProcessor extends MulticastProcessor { } // and create the pair - return new RecipientProcessorExchangePair(index, producerCache, endpoint, producer, prepared, copy); + return new RecipientProcessorExchangePair(index, producerCache, endpoint, producer, prepared, copy, pattern); } protected static Endpoint resolveEndpoint(Exchange exchange, Object recipient) { @@ -247,6 +254,22 @@ public class RecipientListProcessor extends MulticastProcessor { return ExchangeHelper.resolveEndpoint(exchange, recipient); } + protected ExchangePattern resolveExchangePattern(Exchange exchange, Object recipient) throws UnsupportedEncodingException, URISyntaxException { + // trim strings as end users might have added spaces between separators + if (recipient instanceof String) { + String s = ((String) recipient).trim(); + // see if exchangePattern is a parameter in the url + s = URISupport.normalizeUri(s); + URI url = new URI(s); + Map<String, Object> parameters = URISupport.parseParameters(url); + String pattern = (String) parameters.get("exchangePattern"); + if (pattern != null) { + return ExchangePattern.asEnum(pattern); + } + } + return null; + } + protected void doStart() throws Exception { super.doStart(); if (producerCache == null) { http://git-wip-us.apache.org/repos/asf/camel/blob/64920052/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducerClientFactoryCache2Test.java ---------------------------------------------------------------------- diff --git a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducerClientFactoryCache2Test.java b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducerClientFactoryCache2Test.java index 3dd6f99..2793c36 100644 --- a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducerClientFactoryCache2Test.java +++ b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducerClientFactoryCache2Test.java @@ -71,7 +71,8 @@ public class CxfRsProducerClientFactoryCache2Test extends Assert { } private void doRunTest(ProducerTemplate template, final int clientPort) { - Exchange exchange = template.send("direct://http", new Processor() { + // use request as we want InOut + Exchange exchange = template.request("direct://http", new Processor() { public void process(Exchange exchange) throws Exception { exchange.setPattern(ExchangePattern.InOut); Message inMessage = exchange.getIn();