This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new d4cfbd3 CAMEL-15362: FluentProducerTemplate should not keep old state when reused on same thread as its a bit cumbersome to have to clear all state manually. d4cfbd3 is described below commit d4cfbd3631a0b277b5228224e1855c7229062c21 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Sun Aug 2 14:48:25 2020 +0200 CAMEL-15362: FluentProducerTemplate should not keep old state when reused on same thread as its a bit cumbersome to have to clear all state manually. --- .../org/apache/camel/FluentProducerTemplate.java | 11 +- .../impl/engine/DefaultFluentProducerTemplate.java | 204 ++++++++++++++------- .../camel/builder/FluentProducerTemplateTest.java | 19 ++ .../ROOT/pages/camel-3x-upgrade-guide-3_5.adoc | 8 + 4 files changed, 175 insertions(+), 67 deletions(-) diff --git a/core/camel-api/src/main/java/org/apache/camel/FluentProducerTemplate.java b/core/camel-api/src/main/java/org/apache/camel/FluentProducerTemplate.java index f086e25..e9eaaf3 100644 --- a/core/camel-api/src/main/java/org/apache/camel/FluentProducerTemplate.java +++ b/core/camel-api/src/main/java/org/apache/camel/FluentProducerTemplate.java @@ -140,7 +140,10 @@ public interface FluentProducerTemplate extends Service { /** * Remove the body and headers. + * + * @deprecated the template automatic clears when sending */ + @Deprecated FluentProducerTemplate clearAll(); /** @@ -153,7 +156,10 @@ public interface FluentProducerTemplate extends Service { /** * Remove the headers. + * + * @deprecated the template automatic clears when sending */ + @Deprecated FluentProducerTemplate clearHeaders(); /** @@ -173,7 +179,10 @@ public interface FluentProducerTemplate extends Service { /** * Remove the body. + * + * @deprecated the template automatic clears when sending */ + @Deprecated FluentProducerTemplate clearBody(); /** @@ -236,7 +245,7 @@ public interface FluentProducerTemplate extends Service { * .request()} * </pre> * - * @param processor + * @param processor the processor */ FluentProducerTemplate withProcessor(Processor processor); diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultFluentProducerTemplate.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultFluentProducerTemplate.java index 312593a..76d12b4 100644 --- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultFluentProducerTemplate.java +++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultFluentProducerTemplate.java @@ -41,12 +41,12 @@ import org.apache.camel.util.ObjectHelper; public class DefaultFluentProducerTemplate extends ServiceSupport implements FluentProducerTemplate { // transient state of endpoint, headers and body which needs to be thread local scoped to be thread-safe - private final ThreadLocal<Map<String, Object>> headers = new ThreadLocal<>(); - private final ThreadLocal<Object> body = new ThreadLocal<>(); - private final ThreadLocal<Endpoint> endpoint = new ThreadLocal<>(); - private final ThreadLocal<Supplier<Exchange>> exchangeSupplier = new ThreadLocal<>(); - private final ThreadLocal<Supplier<Processor>> processorSupplier = new ThreadLocal<>(); - private final ThreadLocal<Consumer<ProducerTemplate>> templateCustomizer = new ThreadLocal<>(); + private Map<String, Object> headers; + private Object body; + private Endpoint endpoint; + private Supplier<Exchange> exchangeSupplier; + private Supplier<Processor> processorSupplier; + private Consumer<ProducerTemplate> templateCustomizer; private final CamelContext context; private final ClassValue<ConvertBodyProcessor> resultProcessors; @@ -54,6 +54,7 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu private int maximumCacheSize; private boolean eventNotifierEnabled; private volatile ProducerTemplate template; + private volatile boolean cloned; public DefaultFluentProducerTemplate(CamelContext context) { this.context = context; @@ -66,6 +67,29 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu }; } + private DefaultFluentProducerTemplate(CamelContext context, ClassValue<ConvertBodyProcessor> resultProcessors, + Endpoint defaultEndpoint, int maximumCacheSize, boolean eventNotifierEnabled, ProducerTemplate template) { + this.context = context; + this.resultProcessors = resultProcessors; + this.defaultEndpoint = defaultEndpoint; + this.maximumCacheSize = maximumCacheSize; + this.eventNotifierEnabled = eventNotifierEnabled; + this.template = template; + this.cloned = true; + } + + private DefaultFluentProducerTemplate newClone() { + return new DefaultFluentProducerTemplate(context, resultProcessors, defaultEndpoint, maximumCacheSize, eventNotifierEnabled, template); + } + + private DefaultFluentProducerTemplate checkCloned() { + if (!cloned) { + return newClone(); + } else { + return this; + } + } + @Override public CamelContext getCamelContext() { return context; @@ -98,6 +122,9 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu @Override public void setDefaultEndpoint(Endpoint defaultEndpoint) { + if (this.defaultEndpoint != null && isStarted()) { + throw new IllegalArgumentException("Not allowed after template has been started"); + } this.defaultEndpoint = defaultEndpoint; } @@ -108,6 +135,9 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu @Override public void setMaximumCacheSize(int maximumCacheSize) { + if (this.maximumCacheSize != 0 && isStarted()) { + throw new IllegalArgumentException("Not allowed after template has been started"); + } this.maximumCacheSize = maximumCacheSize; } @@ -118,6 +148,9 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu @Override public void setEventNotifierEnabled(boolean eventNotifierEnabled) { + if (isStarted()) { + throw new IllegalArgumentException("Not allowed after template has been started"); + } this.eventNotifierEnabled = eventNotifierEnabled; } @@ -131,52 +164,60 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu @Override public FluentProducerTemplate withHeader(String key, Object value) { - Map<String, Object> map = headers.get(); + DefaultFluentProducerTemplate clone = checkCloned(); + + Map<String, Object> map = clone.headers; if (map == null) { map = new LinkedHashMap<>(); - headers.set(map); + clone.headers = map; } - map.put(key, value); - - return this; + return clone; } @Override public FluentProducerTemplate clearHeaders() { - headers.remove(); + DefaultFluentProducerTemplate clone = checkCloned(); - return this; + if (clone.headers != null) { + clone.headers.clear(); + } + return clone; } @Override public FluentProducerTemplate withBody(Object body) { - this.body.set(body); + DefaultFluentProducerTemplate clone = checkCloned(); - return this; + clone.body = body; + return clone; } @Override public FluentProducerTemplate withBodyAs(Object body, Class<?> type) { + DefaultFluentProducerTemplate clone = checkCloned(); + Object b = type != null - ? context.getTypeConverter().convertTo(type, body) + ? clone.context.getTypeConverter().convertTo(type, body) : body; - - this.body.set(b); - - return this; + clone.body = b; + return clone; } @Override public FluentProducerTemplate clearBody() { - body.remove(); + DefaultFluentProducerTemplate clone = checkCloned(); - return this; + clone.body = null; + return clone; } @Override public FluentProducerTemplate withTemplateCustomizer(final Consumer<ProducerTemplate> templateCustomizer) { - this.templateCustomizer.set(templateCustomizer); + if (this.templateCustomizer != null && isStarted()) { + throw new IllegalArgumentException("Not allowed after template has been started"); + } + this.templateCustomizer = templateCustomizer; if (template != null) { // need to re-initialize template since we have a customizer @@ -195,8 +236,10 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu @Override public FluentProducerTemplate withExchange(final Supplier<Exchange> exchangeSupplier) { - this.exchangeSupplier.set(exchangeSupplier); - return this; + DefaultFluentProducerTemplate clone = checkCloned(); + + clone.exchangeSupplier = exchangeSupplier; + return clone; } @Override @@ -206,8 +249,10 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu @Override public FluentProducerTemplate withProcessor(final Supplier<Processor> processorSupplier) { - this.processorSupplier.set(processorSupplier); - return this; + DefaultFluentProducerTemplate clone = checkCloned(); + + clone.processorSupplier = processorSupplier; + return clone; } @Override @@ -217,8 +262,10 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu @Override public FluentProducerTemplate to(Endpoint endpoint) { - this.endpoint.set(endpoint); - return this; + DefaultFluentProducerTemplate clone = checkCloned(); + + clone.endpoint = endpoint; + return clone; } // ************************ @@ -233,31 +280,37 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu @Override @SuppressWarnings("unchecked") public <T> T request(Class<T> type) throws CamelExecutionException { - if (exchangeSupplier.get() != null) { + if (exchangeSupplier != null && exchangeSupplier.get() != null) { throw new IllegalArgumentException("withExchange not supported on FluentProducerTemplate.request method. Use send method instead."); } + DefaultFluentProducerTemplate clone = checkCloned(); + // Determine the target endpoint - final Endpoint target = target(); + final Endpoint target = clone.target(); // Create the default processor if not provided. - final Processor processorSupplier = this.processorSupplier.get() != null ? this.processorSupplier.get().get() : defaultProcessor(); + Processor processor = clone.processorSupplier != null ? clone.processorSupplier.get() : null; + final Processor processorSupplier = processor != null ? processor : clone.defaultProcessor(); + + // reset cloned flag so when we use it again it has to set values again + cloned = false; T result; if (type == Exchange.class) { - result = (T)template().request(target, processorSupplier); + result = (T)clone.template().request(target, processorSupplier); } else if (type == Message.class) { - Exchange exchange = template().request(target, processorSupplier); + Exchange exchange = clone.template().request(target, processorSupplier); result = (T)exchange.getMessage(); } else { - Exchange exchange = template().send( + Exchange exchange = clone.template().send( target, ExchangePattern.InOut, processorSupplier, - resultProcessors.get(type) + clone.resultProcessors.get(type) ); - result = context.getTypeConverter().convertTo( + result = clone.context.getTypeConverter().convertTo( type, ExchangeHelper.extractResultBody(exchange, exchange.getPattern()) ); @@ -273,23 +326,28 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu @Override public <T> Future<T> asyncRequest(Class<T> type) { + DefaultFluentProducerTemplate clone = checkCloned(); + // Determine the target endpoint - final Endpoint target = target(); + final Endpoint target = clone.target(); + + // reset cloned flag so when we use it again it has to set values again + cloned = false; Future<T> result; - if (ObjectHelper.isNotEmpty(headers.get())) { + if (ObjectHelper.isNotEmpty(clone.headers)) { // Make a copy of the headers and body so that async processing won't // be invalidated by subsequent reuse of the template - final Map<String, Object> headersCopy = new HashMap<>(headers.get()); - final Object bodyCopy = body.get(); + final Map<String, Object> headersCopy = new HashMap<>(clone.headers); + final Object bodyCopy = clone.body; - result = template().asyncRequestBodyAndHeaders(target, bodyCopy, headersCopy, type); + result = clone.template().asyncRequestBodyAndHeaders(target, bodyCopy, headersCopy, type); } else { // Make a copy of the and body so that async processing won't be // invalidated by subsequent reuse of the template - final Object bodyCopy = body.get(); + final Object bodyCopy = clone.body; - result = template().asyncRequestBody(target, bodyCopy, type); + result = clone.template().asyncRequestBody(target, bodyCopy, type); } return result; @@ -301,29 +359,41 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu @Override public Exchange send() throws CamelExecutionException { + DefaultFluentProducerTemplate clone = checkCloned(); + // Determine the target endpoint - final Endpoint target = target(); + final Endpoint target = clone.target(); + + // reset cloned flag so when we use it again it has to set values again + cloned = false; - Exchange exchange = exchangeSupplier.get() != null ? exchangeSupplier.get().get() : null; + Exchange exchange = clone.exchangeSupplier != null ? clone.exchangeSupplier.get() : null; if (exchange != null) { - return template().send(target, exchange); + return clone.template().send(target, exchange); } else { - Processor processor = processorSupplier.get() != null ? processorSupplier.get().get() : defaultProcessor(); - return template().send(target, processor); + Processor proc = clone.processorSupplier != null ? clone.processorSupplier.get() : null; + final Processor processor = proc != null ? proc : clone.defaultProcessor(); + return clone.template().send(target, processor); } } @Override public Future<Exchange> asyncSend() { + DefaultFluentProducerTemplate clone = checkCloned(); + // Determine the target endpoint - final Endpoint target = target(); + final Endpoint target = clone.target(); - Exchange exchange = exchangeSupplier.get() != null ? exchangeSupplier.get().get() : null; + // reset cloned flag so when we use it again it has to set values again + cloned = false; + + Exchange exchange = clone.exchangeSupplier != null ? clone.exchangeSupplier.get() : null; if (exchange != null) { - return template().asyncSend(target, exchange); + return clone.template().asyncSend(target, exchange); } else { - Processor processor = processorSupplier.get() != null ? processorSupplier.get().get() : defaultAsyncProcessor(); - return template().asyncSend(target, processor); + Processor proc = clone.processorSupplier != null ? clone.processorSupplier.get() : null; + final Processor processor = proc != null ? proc : clone.defaultAsyncProcessor(); + return clone.template().asyncSend(target, processor); } } @@ -339,6 +409,8 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu public static FluentProducerTemplate on(CamelContext context) { DefaultFluentProducerTemplate fluent = new DefaultFluentProducerTemplate(context); fluent.start(); + // mark it as cloned as its started + fluent.cloned = true; return fluent; } @@ -348,14 +420,14 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu private Processor defaultProcessor() { return exchange -> { - ObjectHelper.ifNotEmpty(headers.get(), exchange.getIn().getHeaders()::putAll); - ObjectHelper.ifNotEmpty(body.get(), exchange.getIn()::setBody); + ObjectHelper.ifNotEmpty(headers, exchange.getIn().getHeaders()::putAll); + ObjectHelper.ifNotEmpty(body, exchange.getIn()::setBody); }; } private Processor defaultAsyncProcessor() { - final Map<String, Object> headersCopy = ObjectHelper.isNotEmpty(this.headers.get()) ? new HashMap<>(this.headers.get()) : null; - final Object bodyCopy = this.body.get(); + final Map<String, Object> headersCopy = ObjectHelper.isNotEmpty(this.headers) ? new HashMap<>(this.headers) : null; + final Object bodyCopy = this.body; return exchange -> { ObjectHelper.ifNotEmpty(headersCopy, exchange.getIn().getHeaders()::putAll); @@ -364,8 +436,8 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu } private Endpoint target() { - if (endpoint.get() != null) { - return endpoint.get(); + if (endpoint != null) { + return endpoint; } if (defaultEndpoint != null) { return defaultEndpoint; @@ -386,8 +458,8 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu template.setDefaultEndpoint(defaultEndpoint); } template.setEventNotifierEnabled(eventNotifierEnabled); - if (templateCustomizer.get() != null) { - templateCustomizer.get().accept(template); + if (templateCustomizer != null) { + templateCustomizer.accept(template); } ServiceHelper.initService(template); } @@ -400,10 +472,10 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu @Override protected void doStop() throws Exception { clearAll(); - this.endpoint.remove(); - this.exchangeSupplier.remove(); - this.processorSupplier.remove(); - this.templateCustomizer.remove(); + this.endpoint = null; + this.exchangeSupplier = null; + this.processorSupplier = null; + this.templateCustomizer = null; ServiceHelper.stopService(template); } diff --git a/core/camel-core/src/test/java/org/apache/camel/builder/FluentProducerTemplateTest.java b/core/camel-core/src/test/java/org/apache/camel/builder/FluentProducerTemplateTest.java index 95cf165..5da24d7 100644 --- a/core/camel-core/src/test/java/org/apache/camel/builder/FluentProducerTemplateTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/builder/FluentProducerTemplateTest.java @@ -345,6 +345,22 @@ public class FluentProducerTemplateTest extends ContextTestSupport { assertMockEndpointsSatisfied(); } + @Test + public void testUseTwoTimesSameThread() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:echo"); + mock.expectedBodiesReceived("Camel", "World"); + mock.message(0).header("foo").isEqualTo("!"); + mock.message(1).header("foo").isNull(); + + FluentProducerTemplate fluent = context.createFluentProducerTemplate(); + Object result = fluent.withBody("Camel").withHeader("foo", "!").to("direct:echo").request(); + Object result2 = fluent.withBody("World").to("direct:echo").request(); + assertEquals("CamelCamel!", result); + assertEquals("WorldWorld", result2); + + assertMockEndpointsSatisfied(); + } + @Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @@ -370,6 +386,9 @@ public class FluentProducerTemplateTest extends ContextTestSupport { from("direct:inout").transform(constant(123)); from("direct:async").to("mock:async"); + + from("direct:echo").to("mock:echo").setBody().simple("${body}${body}${header.foo}"); + } }; } diff --git a/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_5.adoc b/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_5.adoc index a55d143..c7a6795 100644 --- a/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_5.adoc +++ b/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_5.adoc @@ -6,6 +6,14 @@ from both 3.0 to 3.1 and 3.1 to 3.2. == Upgrading Camel 3.4 to 3.5 +=== FluentProducerTemplate + +The template will now automatic clear its state send sending the message, this avoids end users having to call `clearAll` after usage, +in case the template should be reused to send other messages. + +After the template has been started / used the first time, then its general configuration cannot be altered later, +instead create a new template. + === camel-bean The `bean(class)` EIP will now lookup in the registry first whether there is a single bean instance of the given class type