Repository: camel Updated Branches: refs/heads/camel-2.18.x 35f81f6b0 -> 762495717
CAMEL-10820: DefaultFluentProducerTemplate mixes up data when sending asynchronously (cherry picked from commit 05cbb33c0da05aeb4c8c70137a41d53a58c38388) Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/76249571 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/76249571 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/76249571 Branch: refs/heads/camel-2.18.x Commit: 7624957175d502027113405845fd8feeda37354b Parents: 35f81f6 Author: lburgazzoli <lburgazz...@gmail.com> Authored: Mon Mar 6 11:26:19 2017 +0100 Committer: lburgazzoli <lburgazz...@gmail.com> Committed: Tue Mar 7 15:00:56 2017 +0100 ---------------------------------------------------------------------- .../builder/DefaultFluentProducerTemplate.java | 140 ++++++++++--------- .../org/apache/camel/util/ObjectHelper.java | 18 ++- .../builder/FluentProducerTemplateTest.java | 66 +++++++++ 3 files changed, 159 insertions(+), 65 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/76249571/camel-core/src/main/java/org/apache/camel/builder/DefaultFluentProducerTemplate.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/builder/DefaultFluentProducerTemplate.java b/camel-core/src/main/java/org/apache/camel/builder/DefaultFluentProducerTemplate.java index bac078e..965afef 100644 --- a/camel-core/src/main/java/org/apache/camel/builder/DefaultFluentProducerTemplate.java +++ b/camel-core/src/main/java/org/apache/camel/builder/DefaultFluentProducerTemplate.java @@ -18,6 +18,7 @@ package org.apache.camel.builder; import java.util.HashMap; import java.util.Map; +import java.util.Optional; import java.util.concurrent.Future; import java.util.function.Consumer; import java.util.function.Supplier; @@ -42,24 +43,23 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu private final ClassValue<ConvertBodyProcessor> resultProcessors; private Map<String, Object> headers; private Object body; - private Endpoint endpoint; - private Consumer<ProducerTemplate> templateCustomizer; - private Supplier<Exchange> exchangeSupplier; - private Supplier<Processor> processorSupplier; - private volatile ProducerTemplate template; - private Endpoint defaultEndpoint; + private Optional<Consumer<ProducerTemplate>> templateCustomizer; + private Optional<Supplier<Exchange>> exchangeSupplier; + private Optional<Supplier<Processor>> processorSupplier; + private Optional<Endpoint> endpoint; + private Optional<Endpoint> defaultEndpoint; private int maximumCacheSize; - private boolean eventNotifierEnabled = true; + private boolean eventNotifierEnabled; + private volatile ProducerTemplate template; public DefaultFluentProducerTemplate(CamelContext context) { this.context = context; - this.headers = null; - this.body = null; - this.endpoint = null; - this.templateCustomizer = null; - this.exchangeSupplier = null; - this.processorSupplier = () -> this::populateExchange; - this.template = null; + this.endpoint = Optional.empty(); + this.defaultEndpoint = Optional.empty(); + this.eventNotifierEnabled = true; + this.templateCustomizer = Optional.empty(); + this.exchangeSupplier = Optional.empty(); + this.processorSupplier = Optional.empty(); this.resultProcessors = new ClassValue<ConvertBodyProcessor>() { @Override protected ConvertBodyProcessor computeValue(Class<?> type) { @@ -95,12 +95,12 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu @Override public Endpoint getDefaultEndpoint() { - return defaultEndpoint; + return defaultEndpoint.orElse(null); } @Override public void setDefaultEndpoint(Endpoint defaultEndpoint) { - this.defaultEndpoint = defaultEndpoint; + this.defaultEndpoint = Optional.of(defaultEndpoint); } @Override @@ -168,7 +168,7 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu @Override public FluentProducerTemplate withTemplateCustomizer(final Consumer<ProducerTemplate> templateCustomizer) { - this.templateCustomizer = templateCustomizer; + this.templateCustomizer = Optional.of(templateCustomizer); return this; } @@ -179,7 +179,7 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu @Override public FluentProducerTemplate withExchange(final Supplier<Exchange> exchangeSupplier) { - this.exchangeSupplier = exchangeSupplier; + this.exchangeSupplier = Optional.of(exchangeSupplier); return this; } @@ -190,7 +190,7 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu @Override public FluentProducerTemplate withProcessor(final Supplier<Processor> processorSupplier) { - this.processorSupplier = processorSupplier; + this.processorSupplier = Optional.of(processorSupplier); return this; } @@ -201,7 +201,7 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu @Override public FluentProducerTemplate to(Endpoint endpoint) { - this.endpoint = endpoint; + this.endpoint = Optional.of(endpoint); return this; } @@ -217,13 +217,13 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu @Override @SuppressWarnings("unchecked") public <T> T request(Class<T> type) throws CamelExecutionException { - T result; - Endpoint target = endpoint != null ? endpoint : defaultEndpoint; - // we must have an endpoint to send to - if (target == null) { - throw new IllegalArgumentException("No endpoint configured on FluentProducerTemplate. You can configure an endpoint with to(uri)"); - } + // Determine the target endpoint + final Endpoint target = target(); + + // Create the default processor if not provided. + final Supplier<Processor> processorSupplier = this.processorSupplier.orElse(() -> defaultProcessor()); + T result; if (type == Exchange.class) { result = (T)template().request(target, processorSupplier.get()); } else if (type == Message.class) { @@ -253,18 +253,23 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu @Override public <T> Future<T> asyncRequest(Class<T> type) { - Endpoint target = endpoint != null ? endpoint : defaultEndpoint; - - // we must have an endpoint to send to - if (target == null) { - throw new IllegalArgumentException("No endpoint configured on FluentProducerTemplate. You can configure an endpoint with to(uri)"); - } + // Determine the target endpoint + final Endpoint target = target(); Future<T> result; - if (headers != null) { - result = template().asyncRequestBodyAndHeaders(target, body, headers, type); + if (ObjectHelper.isNotEmpty(headers)) { + // Make a copy of the headers and body so that async processing won't + // be invalidated by subsequent reuse of the template + final Map<String, Object> headersCopy = new HashMap<>(headers); + final Object bodyCopy = body; + + result = template().asyncRequestBodyAndHeaders(target, bodyCopy, headersCopy, type); } else { - result = template().asyncRequestBody(target, body, type); + // Make a copy of the and body so that async processing won't be + // invalidated by subsequent reuse of the template + final Object bodyCopy = body; + + result = template().asyncRequestBody(target, bodyCopy, type); } return result; @@ -276,30 +281,22 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu @Override public Exchange send() throws CamelExecutionException { - Endpoint target = endpoint != null ? endpoint : defaultEndpoint; - - // we must have an endpoint to send to - if (target == null) { - throw new IllegalArgumentException("No endpoint configured on FluentProducerTemplate. You can configure an endpoint with to(uri)"); - } + // Determine the target endpoint + final Endpoint target = target(); - return exchangeSupplier != null - ? template().send(target, exchangeSupplier.get()) - : template().send(target, processorSupplier.get()); + return exchangeSupplier.isPresent() + ? template().send(target, exchangeSupplier.get().get()) + : template().send(target, processorSupplier.orElse(() -> defaultProcessor()).get()); } @Override public Future<Exchange> asyncSend() { - Endpoint target = endpoint != null ? endpoint : defaultEndpoint; - - // we must have an endpoint to send to - if (target == null) { - throw new IllegalArgumentException("No endpoint configured on FluentProducerTemplate. You can configure an endpoint with to(uri)"); - } + // Determine the target endpoint + final Endpoint target = target(); - return exchangeSupplier != null - ? template().asyncSend(target, exchangeSupplier.get()) - : template().asyncSend(target, processorSupplier.get()); + return exchangeSupplier.isPresent() + ? template().asyncSend(target, exchangeSupplier.get().get()) + : template().asyncSend(target, processorSupplier.orElse(() -> defaultAsyncProcessor()).get()); } // ************************ @@ -320,25 +317,40 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu if (template == null) { template = maximumCacheSize > 0 ? context.createProducerTemplate(maximumCacheSize) : context.createProducerTemplate(); - if (defaultEndpoint != null) { - template.setDefaultEndpoint(defaultEndpoint); - } + defaultEndpoint.ifPresent(template::setDefaultEndpoint); template.setEventNotifierEnabled(eventNotifierEnabled); - if (templateCustomizer != null) { - templateCustomizer.accept(template); - } + templateCustomizer.ifPresent(tc -> tc.accept(template)); } return template; } - private void populateExchange(Exchange exchange) throws Exception { - if (headers != null && !headers.isEmpty()) { - exchange.getIn().getHeaders().putAll(headers); + private Processor defaultProcessor() { + return exchange -> { + ObjectHelper.ifNotEmpty(headers, exchange.getIn().getHeaders()::putAll); + ObjectHelper.ifNotEmpty(body, exchange.getIn()::setBody); + }; + } + + private Processor defaultAsyncProcessor() { + final Map<String, Object> headersCopy = ObjectHelper.isNotEmpty(this.headers) ? new HashMap<>(this.headers) : null; + final Object bodyCopy = this.body; + + return exchange -> { + ObjectHelper.ifNotEmpty(headersCopy, exchange.getIn().getHeaders()::putAll); + ObjectHelper.ifNotEmpty(bodyCopy, exchange.getIn()::setBody); + }; + } + + private Endpoint target() { + if (endpoint.isPresent()) { + return endpoint.get(); } - if (body != null) { - exchange.getIn().setBody(body); + if (defaultEndpoint.isPresent()) { + return defaultEndpoint.get(); } + + throw new IllegalArgumentException("No endpoint configured on FluentProducerTemplate. You can configure an endpoint with to(uri)"); } @Override http://git-wip-us.apache.org/repos/asf/camel/blob/76249571/camel-core/src/main/java/org/apache/camel/util/ObjectHelper.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/util/ObjectHelper.java b/camel-core/src/main/java/org/apache/camel/util/ObjectHelper.java index 5f852c7..d9263df 100644 --- a/camel-core/src/main/java/org/apache/camel/util/ObjectHelper.java +++ b/camel-core/src/main/java/org/apache/camel/util/ObjectHelper.java @@ -46,6 +46,7 @@ import java.util.Optional; import java.util.Properties; import java.util.Scanner; import java.util.concurrent.Callable; +import java.util.function.Consumer; import java.util.function.Function; import org.w3c.dom.Node; @@ -378,22 +379,37 @@ public final class ObjectHelper { } /** - * Tests whether the value is <b>not</b> <tt>null</tt> or an empty string. + * Tests whether the value is <b>not</b> <tt>null</tt>, an empty string or an empty collection. * * @param value the value, if its a String it will be tested for text length as well * @return true if <b>not</b> empty */ + @SuppressWarnings("unchecked") public static boolean isNotEmpty(Object value) { if (value == null) { return false; } else if (value instanceof String) { String text = (String) value; return text.trim().length() > 0; + } else if (value instanceof Collection) { + return !((Collection<?>)value).isEmpty(); } else { return true; } } + /** + * Tests whether the value is <b>not</b> <tt>null</tt>, an empty string or an empty collection. + * + * @param value the value, if its a String it will be tested for text length as well + * @param consumer the consumer, the operation to be executed against value if not empty + */ + public static <T> void ifNotEmpty(T value, Consumer<T> consumer) { + if (isNotEmpty(value)) { + consumer.accept(value); + } + } + public static String[] splitOnCharacter(String value, String needle, int count) { String rc[] = new String[count]; rc[0] = value; http://git-wip-us.apache.org/repos/asf/camel/blob/76249571/camel-core/src/test/java/org/apache/camel/builder/FluentProducerTemplateTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/builder/FluentProducerTemplateTest.java b/camel-core/src/test/java/org/apache/camel/builder/FluentProducerTemplateTest.java index 4308fc6..660e86d 100644 --- a/camel-core/src/test/java/org/apache/camel/builder/FluentProducerTemplateTest.java +++ b/camel-core/src/test/java/org/apache/camel/builder/FluentProducerTemplateTest.java @@ -16,6 +16,8 @@ */ package org.apache.camel.builder; +import java.util.concurrent.Future; + import org.apache.camel.CamelExecutionException; import org.apache.camel.ContextTestSupport; import org.apache.camel.Exchange; @@ -47,6 +49,21 @@ public class FluentProducerTemplateTest extends ContextTestSupport { } } + public void testDefaultEndpoint() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedBodiesReceived("Bye World"); + + FluentProducerTemplate fluent = context.createFluentProducerTemplate(); + fluent.setDefaultEndpointUri("direct:in"); + + Object result = fluent.withBody("Hello World").request(); + assertMockEndpointsSatisfied(); + + assertEquals("Bye World", result); + + assertSame(context, fluent.getCamelContext()); + } + public void testFromCamelContext() throws Exception { MockEndpoint mock = getMockEndpoint("mock:result"); mock.expectedBodiesReceived("Bye World"); @@ -306,6 +323,52 @@ public class FluentProducerTemplateTest extends ContextTestSupport { ); } + public void testAsyncRequest() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:async"); + mock.expectedMessageCount(2); + mock.expectedHeaderValuesReceivedInAnyOrder("action", "action-1", "action-2"); + mock.expectedBodiesReceivedInAnyOrder("body-1", "body-2"); + + FluentProducerTemplate fluent = context.createFluentProducerTemplate(); + Future<String> future1 = fluent.to("direct:async").withHeader("action", "action-1").withBody("body-1").asyncRequest(String.class); + Future<String> future2 = fluent.to("direct:async").withHeader("action", "action-2").withBody("body-2").asyncRequest(String.class); + + String result1 = future1.get(); + String result2 = future2.get(); + + mock.assertIsSatisfied(); + + assertEquals("body-1", result1); + assertEquals("body-2", result2); + + String action = mock.getExchanges().get(0).getIn().getHeader("action", String.class); + if (action.equals("action-1")) { + assertEquals("body-1", mock.getExchanges().get(0).getIn().getBody(String.class)); + } + if (action.equals("action-2")) { + assertEquals("body-2", mock.getExchanges().get(0).getIn().getBody(String.class)); + } + } + + public void testAsyncSend() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:async"); + mock.expectedMessageCount(2); + + FluentProducerTemplate fluent = context.createFluentProducerTemplate(); + + Future<Exchange> future1 = fluent.to("direct:async").withHeader("action", "action-1").withBody("body-1").asyncSend(); + Future<Exchange> future2 = fluent.to("direct:async").withHeader("action", "action-2").withBody("body-2").asyncSend(); + + Exchange exchange1 = future1.get(); + Exchange exchange2 = future2.get(); + + assertEquals("action-1", exchange1.getIn().getHeader("action", String.class)); + assertEquals("body-1", exchange1.getIn().getBody(String.class)); + + assertEquals("action-2", exchange2.getIn().getHeader("action", String.class)); + assertEquals("body-2", exchange2.getIn().getBody(String.class)); + } + @Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @@ -343,6 +406,9 @@ public class FluentProducerTemplateTest extends ContextTestSupport { .to("mock:result"); from("direct:inout").transform(constant(123)); + + from("direct:async") + .to("mock:async"); } }; }