CAMEL-5301: Recipient list supports exchangePattern uri option in the endpoints.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/99ca9a5e
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/99ca9a5e
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/99ca9a5e

Branch: refs/heads/master
Commit: 99ca9a5e814e4b54ad32c92b90abf3a752166385
Parents: e3b3142
Author: Claus Ibsen <davscl...@apache.org>
Authored: Tue Nov 11 13:06:47 2014 +0100
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Tue Nov 11 13:37:16 2014 +0100

----------------------------------------------------------------------
 .../camel/processor/RecipientListProcessor.java | 16 ++++-
 .../camel/processor/RecipientListMEPTest.java   | 62 ++++++++++++++++++++
 2 files changed, 77 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/99ca9a5e/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
 
b/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
index 70a3853..492b553 100644
--- 
a/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
+++ 
b/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
@@ -24,8 +24,10 @@ import java.util.concurrent.ExecutorService;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
+import org.apache.camel.impl.DefaultEndpoint;
 import org.apache.camel.impl.ProducerCache;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.spi.RouteContext;
@@ -70,6 +72,7 @@ public class RecipientListProcessor extends 
MulticastProcessor {
         private Processor prepared;
         private final Exchange exchange;
         private final ProducerCache producerCache;
+        private final ExchangePattern originalExchangePattern;
 
         private RecipientProcessorExchangePair(int index, ProducerCache 
producerCache, Endpoint endpoint, Producer producer,
                                                Processor prepared, Exchange 
exchange) {
@@ -79,6 +82,7 @@ public class RecipientListProcessor extends 
MulticastProcessor {
             this.producer = producer;
             this.prepared = prepared;
             this.exchange = exchange;
+            this.originalExchangePattern = exchange.getPattern();
         }
 
         public int getIndex() {
@@ -103,12 +107,22 @@ public class RecipientListProcessor extends 
MulticastProcessor {
             exchange.setProperty(Exchange.RECIPIENT_LIST_ENDPOINT, 
endpoint.getEndpointUri());
             // ensure stream caching is reset
             MessageHelper.resetStreamCache(exchange.getIn());
+            // if the MEP on the endpoint is different then
+            if (endpoint instanceof DefaultEndpoint) {
+                ExchangePattern pattern = ((DefaultEndpoint) 
endpoint).getExchangePattern();
+                if (pattern != null && !pattern.equals(exchange.getPattern())) 
{
+                    LOG.trace("Using exchangePattern: {} on exchange: {}", 
pattern, exchange);
+                    exchange.setPattern(pattern);
+                }
+            }
         }
 
         public void done() {
             LOG.trace("RecipientProcessorExchangePair #{} done: {}", index, 
exchange);
-            // when we are done we should release back in pool
             try {
+                // preserve original MEP
+                exchange.setPattern(originalExchangePattern);
+                // when we are done we should release back in pool
                 producerCache.releaseProducer(endpoint, producer);
             } catch (Exception e) {
                 if (LOG.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/camel/blob/99ca9a5e/camel-core/src/test/java/org/apache/camel/processor/RecipientListMEPTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/RecipientListMEPTest.java 
b/camel-core/src/test/java/org/apache/camel/processor/RecipientListMEPTest.java
new file mode 100644
index 0000000..c33305e
--- /dev/null
+++ 
b/camel-core/src/test/java/org/apache/camel/processor/RecipientListMEPTest.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+
+public class RecipientListMEPTest extends ContextTestSupport {
+
+    public void testMEPInOnly() throws Exception {
+        getMockEndpoint("mock:foo").expectedBodiesReceived("Hello World", 
"Hello Again");
+        getMockEndpoint("mock:result").expectedBodiesReceived("Bye World", 
"Bye World");
+
+        template.sendBody("direct:start", "Hello World");
+        template.sendBody("direct:start", "Hello Again");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testMEPInOutOnly() throws Exception {
+        getMockEndpoint("mock:foo").expectedBodiesReceived("Hello World", 
"Hello Again");
+        getMockEndpoint("mock:result").expectedBodiesReceived("Bye World", 
"Bye World");
+
+        String out = template.requestBody("direct:start", "Hello World", 
String.class);
+        assertEquals("Bye World", out);
+
+        out = template.requestBody("direct:start", "Hello Again", 
String.class);
+        assertEquals("Bye World", out);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .recipientList().constant("seda:foo?exchangePattern=InOut")
+                    .to("mock:result");
+
+                from("seda:foo")
+                    .to("mock:foo")
+                    .transform().constant("Bye World");
+            }
+        };
+    }
+}

Reply via email to