CAMEL-9940: ProducerTemplate - Make extract result set part of UoW

Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/1cca6b7c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/1cca6b7c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/1cca6b7c

Branch: refs/heads/master
Commit: 1cca6b7c1d5445c3d75ae6b1d3abbf6c1568eafa
Parents: b4a6ad4
Author: Claus Ibsen <davscl...@apache.org>
Authored: Wed May 4 09:04:17 2016 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Wed May 4 14:07:56 2016 +0200

----------------------------------------------------------------------
 .../camel/impl/DefaultProducerTemplate.java     | 40 ++++++++++++---
 .../org/apache/camel/impl/ProducerCache.java    | 52 ++++++++++++++++----
 2 files changed, 76 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/1cca6b7c/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java 
b/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
index aee4973..a5c8867 100644
--- 
a/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
+++ 
b/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
@@ -32,6 +32,7 @@ import org.apache.camel.Message;
 import org.apache.camel.NoSuchEndpointException;
 import org.apache.camel.Processor;
 import org.apache.camel.ProducerTemplate;
+import org.apache.camel.processor.ConvertBodyProcessor;
 import org.apache.camel.spi.Synchronization;
 import org.apache.camel.support.ServiceSupport;
 import org.apache.camel.util.CamelContextHelper;
@@ -324,37 +325,44 @@ public class DefaultProducerTemplate extends 
ServiceSupport implements ProducerT
     }
 
     public <T> T requestBody(Object body, Class<T> type) {
-        Object answer = requestBody(body);
+        Exchange exchange = producerCache.send(getMandatoryDefaultEndpoint(), 
ExchangePattern.InOut, createSetBodyProcessor(body), 
createConvertBodyProcessor(type));
+        Object answer = extractResultBody(exchange);
         return camelContext.getTypeConverter().convertTo(type, answer);
     }
 
     public <T> T requestBody(Endpoint endpoint, Object body, Class<T> type) {
-        Object answer = requestBody(endpoint, body);
+        Exchange exchange = producerCache.send(endpoint, 
ExchangePattern.InOut, createSetBodyProcessor(body), 
createConvertBodyProcessor(type));
+        Object answer = extractResultBody(exchange);
         return camelContext.getTypeConverter().convertTo(type, answer);
     }
 
     public <T> T requestBody(String endpointUri, Object body, Class<T> type) {
-        Object answer = requestBody(endpointUri, body);
+        Exchange exchange = 
producerCache.send(resolveMandatoryEndpoint(endpointUri), 
ExchangePattern.InOut, createSetBodyProcessor(body), 
createConvertBodyProcessor(type));
+        Object answer = extractResultBody(exchange);
         return camelContext.getTypeConverter().convertTo(type, answer);
     }
 
     public <T> T requestBodyAndHeader(Endpoint endpoint, Object body, String 
header, Object headerValue, Class<T> type) {
-        Object answer = requestBodyAndHeader(endpoint, body, header, 
headerValue);
+        Exchange exchange = producerCache.send(endpoint, 
ExchangePattern.InOut, createBodyAndHeaderProcessor(body, header, headerValue), 
createConvertBodyProcessor(type));
+        Object answer = extractResultBody(exchange);
         return camelContext.getTypeConverter().convertTo(type, answer);
     }
 
     public <T> T requestBodyAndHeader(String endpointUri, Object body, String 
header, Object headerValue, Class<T> type) {
-        Object answer = requestBodyAndHeader(endpointUri, body, header, 
headerValue);
+        Exchange exchange = 
producerCache.send(resolveMandatoryEndpoint(endpointUri), 
ExchangePattern.InOut, createBodyAndHeaderProcessor(body, header, headerValue), 
createConvertBodyProcessor(type));
+        Object answer = extractResultBody(exchange);
         return camelContext.getTypeConverter().convertTo(type, answer);
     }
 
     public <T> T requestBodyAndHeaders(String endpointUri, Object body, 
Map<String, Object> headers, Class<T> type) {
-        Object answer = requestBodyAndHeaders(endpointUri, body, headers);
+        Exchange exchange = 
producerCache.send(resolveMandatoryEndpoint(endpointUri), 
ExchangePattern.InOut, createBodyAndHeaders(body, headers), 
createConvertBodyProcessor(type));
+        Object answer = extractResultBody(exchange);
         return camelContext.getTypeConverter().convertTo(type, answer);
     }
 
     public <T> T requestBodyAndHeaders(Endpoint endpoint, Object body, 
Map<String, Object> headers, Class<T> type) {
-        Object answer = requestBodyAndHeaders(endpoint, body, headers);
+        Exchange exchange = producerCache.send(endpoint, 
ExchangePattern.InOut, createBodyAndHeaders(body, headers), 
createConvertBodyProcessor(type));
+        Object answer = extractResultBody(exchange);
         return camelContext.getTypeConverter().convertTo(type, answer);
     }
 
@@ -436,6 +444,20 @@ public class DefaultProducerTemplate extends 
ServiceSupport implements ProducerT
         };
     }
 
+    protected Processor createBodyAndHeaders(final Object body, final 
Map<String, Object> headers) {
+        return new Processor() {
+            public void process(Exchange exchange) {
+                Message in = exchange.getIn();
+                if (headers != null) {
+                    for (Map.Entry<String, Object> header : 
headers.entrySet()) {
+                        in.setHeader(header.getKey(), header.getValue());
+                    }
+                }
+                in.setBody(body);
+            }
+        };
+    }
+
     protected Processor createBodyAndPropertyProcessor(final Object body, 
final String property, final Object propertyValue) {
         return new Processor() {
             public void process(Exchange exchange) {
@@ -455,6 +477,10 @@ public class DefaultProducerTemplate extends 
ServiceSupport implements ProducerT
         };
     }
 
+    protected Processor createConvertBodyProcessor(final Class<?> type) {
+        return new ConvertBodyProcessor(type);
+    }
+
     protected Endpoint resolveMandatoryEndpoint(String endpointUri) {
         Endpoint endpoint = camelContext.getEndpoint(endpointUri);
         if (endpoint == null) {

http://git-wip-us.apache.org/repos/asf/camel/blob/1cca6b7c/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java 
b/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
index c016ea4..ebf641b 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.impl;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.camel.AsyncCallback;
@@ -26,12 +28,12 @@ import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.FailedToCreateProducerException;
-import org.apache.camel.PollingConsumer;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.ProducerCallback;
 import org.apache.camel.ServicePoolAware;
-import org.apache.camel.processor.UnitOfWorkProducer;
+import org.apache.camel.processor.CamelInternalProcessor;
+import org.apache.camel.processor.Pipeline;
 import org.apache.camel.spi.EndpointUtilizationStatistics;
 import org.apache.camel.spi.ServicePool;
 import org.apache.camel.support.ServiceSupport;
@@ -203,7 +205,7 @@ public class ProducerCache extends ServiceSupport {
      * @param exchange the exchange to send
      */
     public void send(Endpoint endpoint, Exchange exchange) {
-        sendExchange(endpoint, null, null, exchange);
+        sendExchange(endpoint, null, null, null, exchange);
     }
 
     /**
@@ -219,7 +221,7 @@ public class ProducerCache extends ServiceSupport {
      * @return the exchange
      */
     public Exchange send(Endpoint endpoint, Processor processor) {
-        return sendExchange(endpoint, null, processor, null);
+        return sendExchange(endpoint, null, processor, null, null);
     }
 
     /**
@@ -236,7 +238,25 @@ public class ProducerCache extends ServiceSupport {
      * @return the exchange
      */
     public Exchange send(Endpoint endpoint, ExchangePattern pattern, Processor 
processor) {
-        return sendExchange(endpoint, pattern, processor, null);
+        return sendExchange(endpoint, pattern, processor, null, null);
+    }
+
+    /**
+     * Sends an exchange to an endpoint using a supplied
+     * {@link Processor} to populate the exchange
+     * <p>
+     * This method will <b>not</b> throw an exception. If processing of the 
given
+     * Exchange failed then the exception is stored on the return Exchange
+     *
+     * @param endpoint the endpoint to send the exchange to
+     * @param pattern the message {@link ExchangePattern} such as
+     *   {@link ExchangePattern#InOnly} or {@link ExchangePattern#InOut}
+     * @param processor the transformer used to populate the new exchange
+     * @param resultProcessor a processor to process the exchange when the 
send is complete.
+     * @return the exchange
+     */
+    public Exchange send(Endpoint endpoint, ExchangePattern pattern, Processor 
processor, Processor resultProcessor) {
+        return sendExchange(endpoint, pattern, processor, resultProcessor, 
null);
     }
 
     /**
@@ -377,7 +397,7 @@ public class ProducerCache extends ServiceSupport {
     }
 
     protected Exchange sendExchange(final Endpoint endpoint, ExchangePattern 
pattern,
-                                    final Processor processor, Exchange 
exchange) {
+                                    final Processor processor, final Processor 
resultProcessor, Exchange exchange) {
         return doInProducer(endpoint, exchange, pattern, new 
ProducerCallback<Exchange>() {
             public Exchange doInProducer(Producer producer, Exchange exchange, 
ExchangePattern pattern) {
                 if (exchange == null) {
@@ -408,9 +428,23 @@ public class ProducerCache extends ServiceSupport {
                         watch = new StopWatch();
                         
EventHelper.notifyExchangeSending(exchange.getContext(), exchange, endpoint);
                     }
-                    // ensure we run in an unit of work
-                    Producer target = new UnitOfWorkProducer(producer);
-                    target.process(exchange);
+
+                    // if we have a result processor then wrap in pipeline to 
execute both of them in sequence
+                    Processor target;
+                    if (resultProcessor != null) {
+                        List<Processor> processors = new 
ArrayList<Processor>(2);
+                        processors.add(producer);
+                        processors.add(resultProcessor);
+                        target = Pipeline.newInstance(getCamelContext(), 
processors);
+                    } else {
+                        target = producer;
+                    }
+
+                    // wrap in unit of work
+                    CamelInternalProcessor internal = new 
CamelInternalProcessor(target);
+                    internal.addAdvice(new 
CamelInternalProcessor.UnitOfWorkProcessorAdvice(null));
+
+                    internal.process(exchange);
                 } catch (Throwable e) {
                     // ensure exceptions is caught and set on the exchange
                     exchange.setException(e);

Reply via email to