Repository: camel
Updated Branches:
  refs/heads/master e3b31428c -> 64920052f


CAMEL-5301: Recipient list supports exchangePattern uri option in the endpoints.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/64920052
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/64920052
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/64920052

Branch: refs/heads/master
Commit: 64920052fec03af0c632526b32f5d31e1e805dd9
Parents: 691748e
Author: Claus Ibsen <davscl...@apache.org>
Authored: Tue Nov 11 13:33:10 2014 +0100
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Tue Nov 11 13:37:16 2014 +0100

----------------------------------------------------------------------
 .../camel/processor/RecipientListProcessor.java | 51 ++++++++++++++------
 .../CxfRsProducerClientFactoryCache2Test.java   |  3 +-
 2 files changed, 39 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/64920052/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
 
b/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
index 492b553..1087da6 100644
--- 
a/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
+++ 
b/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
@@ -16,9 +16,13 @@
  */
 package org.apache.camel.processor;
 
+import java.io.UnsupportedEncodingException;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ExecutorService;
 
 import org.apache.camel.CamelContext;
@@ -27,13 +31,13 @@ import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
-import org.apache.camel.impl.DefaultEndpoint;
 import org.apache.camel.impl.ProducerCache;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.spi.RouteContext;
 import org.apache.camel.util.ExchangeHelper;
 import org.apache.camel.util.MessageHelper;
 import org.apache.camel.util.ServiceHelper;
+import org.apache.camel.util.URISupport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -72,17 +76,18 @@ public class RecipientListProcessor extends 
MulticastProcessor {
         private Processor prepared;
         private final Exchange exchange;
         private final ProducerCache producerCache;
-        private final ExchangePattern originalExchangePattern;
+        private final ExchangePattern pattern;
+        private volatile ExchangePattern originalPattern;
 
         private RecipientProcessorExchangePair(int index, ProducerCache 
producerCache, Endpoint endpoint, Producer producer,
-                                               Processor prepared, Exchange 
exchange) {
+                                               Processor prepared, Exchange 
exchange, ExchangePattern pattern) {
             this.index = index;
             this.producerCache = producerCache;
             this.endpoint = endpoint;
             this.producer = producer;
             this.prepared = prepared;
             this.exchange = exchange;
-            this.originalExchangePattern = exchange.getPattern();
+            this.pattern = pattern;
         }
 
         public int getIndex() {
@@ -108,12 +113,10 @@ public class RecipientListProcessor extends 
MulticastProcessor {
             // ensure stream caching is reset
             MessageHelper.resetStreamCache(exchange.getIn());
             // if the MEP on the endpoint is different then
-            if (endpoint instanceof DefaultEndpoint) {
-                ExchangePattern pattern = ((DefaultEndpoint) 
endpoint).getExchangePattern();
-                if (pattern != null && !pattern.equals(exchange.getPattern())) 
{
-                    LOG.trace("Using exchangePattern: {} on exchange: {}", 
pattern, exchange);
-                    exchange.setPattern(pattern);
-                }
+            if (pattern != null) {
+                originalPattern = exchange.getPattern();
+                LOG.trace("Using exchangePattern: {} on exchange: {}", 
pattern, exchange);
+                exchange.setPattern(pattern);
             }
         }
 
@@ -121,7 +124,9 @@ public class RecipientListProcessor extends 
MulticastProcessor {
             LOG.trace("RecipientProcessorExchangePair #{} done: {}", index, 
exchange);
             try {
                 // preserve original MEP
-                exchange.setPattern(originalExchangePattern);
+                if (originalPattern != null) {
+                    exchange.setPattern(originalPattern);
+                }
                 // when we are done we should release back in pool
                 producerCache.releaseProducer(endpoint, producer);
             } catch (Exception e) {
@@ -183,8 +188,10 @@ public class RecipientListProcessor extends 
MulticastProcessor {
             Object recipient = iter.next();
             Endpoint endpoint;
             Producer producer;
+            ExchangePattern pattern;
             try {
                 endpoint = resolveEndpoint(exchange, recipient);
+                pattern = resolveExchangePattern(exchange, recipient);
                 producer = producerCache.acquireProducer(endpoint);
             } catch (Exception e) {
                 if (isIgnoreInvalidEndpoints()) {
@@ -199,7 +206,7 @@ public class RecipientListProcessor extends 
MulticastProcessor {
             }
 
             // then create the exchange pair
-            result.add(createProcessorExchangePair(index++, endpoint, 
producer, exchange));
+            result.add(createProcessorExchangePair(index++, endpoint, 
producer, exchange, pattern));
         }
 
         return result;
@@ -208,7 +215,7 @@ public class RecipientListProcessor extends 
MulticastProcessor {
     /**
      * This logic is similar to MulticastProcessor but we have to return a 
RecipientProcessorExchangePair instead
      */
-    protected ProcessorExchangePair createProcessorExchangePair(int index, 
Endpoint endpoint, Producer producer, Exchange exchange) {
+    protected ProcessorExchangePair createProcessorExchangePair(int index, 
Endpoint endpoint, Producer producer, Exchange exchange, ExchangePattern 
pattern) {
         Processor prepared = producer;
 
         // copy exchange, and do not share the unit of work
@@ -236,7 +243,7 @@ public class RecipientListProcessor extends 
MulticastProcessor {
         }
 
         // and create the pair
-        return new RecipientProcessorExchangePair(index, producerCache, 
endpoint, producer, prepared, copy);
+        return new RecipientProcessorExchangePair(index, producerCache, 
endpoint, producer, prepared, copy, pattern);
     }
 
     protected static Endpoint resolveEndpoint(Exchange exchange, Object 
recipient) {
@@ -247,6 +254,22 @@ public class RecipientListProcessor extends 
MulticastProcessor {
         return ExchangeHelper.resolveEndpoint(exchange, recipient);
     }
 
+    protected ExchangePattern resolveExchangePattern(Exchange exchange, Object 
recipient) throws UnsupportedEncodingException, URISyntaxException {
+        // trim strings as end users might have added spaces between separators
+        if (recipient instanceof String) {
+            String s = ((String) recipient).trim();
+            // see if exchangePattern is a parameter in the url
+            s = URISupport.normalizeUri(s);
+            URI url = new URI(s);
+            Map<String, Object> parameters = URISupport.parseParameters(url);
+            String pattern = (String) parameters.get("exchangePattern");
+            if (pattern != null) {
+                return ExchangePattern.asEnum(pattern);
+            }
+        }
+        return null;
+    }
+
     protected void doStart() throws Exception {
         super.doStart();
         if (producerCache == null) {

http://git-wip-us.apache.org/repos/asf/camel/blob/64920052/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducerClientFactoryCache2Test.java
----------------------------------------------------------------------
diff --git 
a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducerClientFactoryCache2Test.java
 
b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducerClientFactoryCache2Test.java
index 3dd6f99..2793c36 100644
--- 
a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducerClientFactoryCache2Test.java
+++ 
b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducerClientFactoryCache2Test.java
@@ -71,7 +71,8 @@ public class CxfRsProducerClientFactoryCache2Test extends 
Assert {
     }
 
     private void doRunTest(ProducerTemplate template, final int clientPort) {
-        Exchange exchange = template.send("direct://http", new Processor() {
+        // use request as we want InOut
+        Exchange exchange = template.request("direct://http", new Processor() {
             public void process(Exchange exchange) throws Exception {
                 exchange.setPattern(ExchangePattern.InOut);
                 Message inMessage = exchange.getIn();

Reply via email to