CAMEL-9940: ProducerTemplate - Make extract result set part of UoW
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/1cca6b7c Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/1cca6b7c Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/1cca6b7c Branch: refs/heads/master Commit: 1cca6b7c1d5445c3d75ae6b1d3abbf6c1568eafa Parents: b4a6ad4 Author: Claus Ibsen <davscl...@apache.org> Authored: Wed May 4 09:04:17 2016 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Wed May 4 14:07:56 2016 +0200 ---------------------------------------------------------------------- .../camel/impl/DefaultProducerTemplate.java | 40 ++++++++++++--- .../org/apache/camel/impl/ProducerCache.java | 52 ++++++++++++++++---- 2 files changed, 76 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/1cca6b7c/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java index aee4973..a5c8867 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java +++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java @@ -32,6 +32,7 @@ import org.apache.camel.Message; import org.apache.camel.NoSuchEndpointException; import org.apache.camel.Processor; import org.apache.camel.ProducerTemplate; +import org.apache.camel.processor.ConvertBodyProcessor; import org.apache.camel.spi.Synchronization; import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.CamelContextHelper; @@ -324,37 +325,44 @@ public class DefaultProducerTemplate extends ServiceSupport implements ProducerT } public <T> T requestBody(Object body, Class<T> type) { - Object answer = requestBody(body); + Exchange exchange = producerCache.send(getMandatoryDefaultEndpoint(), ExchangePattern.InOut, createSetBodyProcessor(body), createConvertBodyProcessor(type)); + Object answer = extractResultBody(exchange); return camelContext.getTypeConverter().convertTo(type, answer); } public <T> T requestBody(Endpoint endpoint, Object body, Class<T> type) { - Object answer = requestBody(endpoint, body); + Exchange exchange = producerCache.send(endpoint, ExchangePattern.InOut, createSetBodyProcessor(body), createConvertBodyProcessor(type)); + Object answer = extractResultBody(exchange); return camelContext.getTypeConverter().convertTo(type, answer); } public <T> T requestBody(String endpointUri, Object body, Class<T> type) { - Object answer = requestBody(endpointUri, body); + Exchange exchange = producerCache.send(resolveMandatoryEndpoint(endpointUri), ExchangePattern.InOut, createSetBodyProcessor(body), createConvertBodyProcessor(type)); + Object answer = extractResultBody(exchange); return camelContext.getTypeConverter().convertTo(type, answer); } public <T> T requestBodyAndHeader(Endpoint endpoint, Object body, String header, Object headerValue, Class<T> type) { - Object answer = requestBodyAndHeader(endpoint, body, header, headerValue); + Exchange exchange = producerCache.send(endpoint, ExchangePattern.InOut, createBodyAndHeaderProcessor(body, header, headerValue), createConvertBodyProcessor(type)); + Object answer = extractResultBody(exchange); return camelContext.getTypeConverter().convertTo(type, answer); } public <T> T requestBodyAndHeader(String endpointUri, Object body, String header, Object headerValue, Class<T> type) { - Object answer = requestBodyAndHeader(endpointUri, body, header, headerValue); + Exchange exchange = producerCache.send(resolveMandatoryEndpoint(endpointUri), ExchangePattern.InOut, createBodyAndHeaderProcessor(body, header, headerValue), createConvertBodyProcessor(type)); + Object answer = extractResultBody(exchange); return camelContext.getTypeConverter().convertTo(type, answer); } public <T> T requestBodyAndHeaders(String endpointUri, Object body, Map<String, Object> headers, Class<T> type) { - Object answer = requestBodyAndHeaders(endpointUri, body, headers); + Exchange exchange = producerCache.send(resolveMandatoryEndpoint(endpointUri), ExchangePattern.InOut, createBodyAndHeaders(body, headers), createConvertBodyProcessor(type)); + Object answer = extractResultBody(exchange); return camelContext.getTypeConverter().convertTo(type, answer); } public <T> T requestBodyAndHeaders(Endpoint endpoint, Object body, Map<String, Object> headers, Class<T> type) { - Object answer = requestBodyAndHeaders(endpoint, body, headers); + Exchange exchange = producerCache.send(endpoint, ExchangePattern.InOut, createBodyAndHeaders(body, headers), createConvertBodyProcessor(type)); + Object answer = extractResultBody(exchange); return camelContext.getTypeConverter().convertTo(type, answer); } @@ -436,6 +444,20 @@ public class DefaultProducerTemplate extends ServiceSupport implements ProducerT }; } + protected Processor createBodyAndHeaders(final Object body, final Map<String, Object> headers) { + return new Processor() { + public void process(Exchange exchange) { + Message in = exchange.getIn(); + if (headers != null) { + for (Map.Entry<String, Object> header : headers.entrySet()) { + in.setHeader(header.getKey(), header.getValue()); + } + } + in.setBody(body); + } + }; + } + protected Processor createBodyAndPropertyProcessor(final Object body, final String property, final Object propertyValue) { return new Processor() { public void process(Exchange exchange) { @@ -455,6 +477,10 @@ public class DefaultProducerTemplate extends ServiceSupport implements ProducerT }; } + protected Processor createConvertBodyProcessor(final Class<?> type) { + return new ConvertBodyProcessor(type); + } + protected Endpoint resolveMandatoryEndpoint(String endpointUri) { Endpoint endpoint = camelContext.getEndpoint(endpointUri); if (endpoint == null) { http://git-wip-us.apache.org/repos/asf/camel/blob/1cca6b7c/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java b/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java index c016ea4..ebf641b 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java +++ b/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java @@ -16,6 +16,8 @@ */ package org.apache.camel.impl; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import org.apache.camel.AsyncCallback; @@ -26,12 +28,12 @@ import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; import org.apache.camel.FailedToCreateProducerException; -import org.apache.camel.PollingConsumer; import org.apache.camel.Processor; import org.apache.camel.Producer; import org.apache.camel.ProducerCallback; import org.apache.camel.ServicePoolAware; -import org.apache.camel.processor.UnitOfWorkProducer; +import org.apache.camel.processor.CamelInternalProcessor; +import org.apache.camel.processor.Pipeline; import org.apache.camel.spi.EndpointUtilizationStatistics; import org.apache.camel.spi.ServicePool; import org.apache.camel.support.ServiceSupport; @@ -203,7 +205,7 @@ public class ProducerCache extends ServiceSupport { * @param exchange the exchange to send */ public void send(Endpoint endpoint, Exchange exchange) { - sendExchange(endpoint, null, null, exchange); + sendExchange(endpoint, null, null, null, exchange); } /** @@ -219,7 +221,7 @@ public class ProducerCache extends ServiceSupport { * @return the exchange */ public Exchange send(Endpoint endpoint, Processor processor) { - return sendExchange(endpoint, null, processor, null); + return sendExchange(endpoint, null, processor, null, null); } /** @@ -236,7 +238,25 @@ public class ProducerCache extends ServiceSupport { * @return the exchange */ public Exchange send(Endpoint endpoint, ExchangePattern pattern, Processor processor) { - return sendExchange(endpoint, pattern, processor, null); + return sendExchange(endpoint, pattern, processor, null, null); + } + + /** + * Sends an exchange to an endpoint using a supplied + * {@link Processor} to populate the exchange + * <p> + * This method will <b>not</b> throw an exception. If processing of the given + * Exchange failed then the exception is stored on the return Exchange + * + * @param endpoint the endpoint to send the exchange to + * @param pattern the message {@link ExchangePattern} such as + * {@link ExchangePattern#InOnly} or {@link ExchangePattern#InOut} + * @param processor the transformer used to populate the new exchange + * @param resultProcessor a processor to process the exchange when the send is complete. + * @return the exchange + */ + public Exchange send(Endpoint endpoint, ExchangePattern pattern, Processor processor, Processor resultProcessor) { + return sendExchange(endpoint, pattern, processor, resultProcessor, null); } /** @@ -377,7 +397,7 @@ public class ProducerCache extends ServiceSupport { } protected Exchange sendExchange(final Endpoint endpoint, ExchangePattern pattern, - final Processor processor, Exchange exchange) { + final Processor processor, final Processor resultProcessor, Exchange exchange) { return doInProducer(endpoint, exchange, pattern, new ProducerCallback<Exchange>() { public Exchange doInProducer(Producer producer, Exchange exchange, ExchangePattern pattern) { if (exchange == null) { @@ -408,9 +428,23 @@ public class ProducerCache extends ServiceSupport { watch = new StopWatch(); EventHelper.notifyExchangeSending(exchange.getContext(), exchange, endpoint); } - // ensure we run in an unit of work - Producer target = new UnitOfWorkProducer(producer); - target.process(exchange); + + // if we have a result processor then wrap in pipeline to execute both of them in sequence + Processor target; + if (resultProcessor != null) { + List<Processor> processors = new ArrayList<Processor>(2); + processors.add(producer); + processors.add(resultProcessor); + target = Pipeline.newInstance(getCamelContext(), processors); + } else { + target = producer; + } + + // wrap in unit of work + CamelInternalProcessor internal = new CamelInternalProcessor(target); + internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(null)); + + internal.process(exchange); } catch (Throwable e) { // ensure exceptions is caught and set on the exchange exchange.setException(e);