This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new d4cfbd3  CAMEL-15362: FluentProducerTemplate should not keep old state 
when reused on same thread as its a bit cumbersome to have to clear all state 
manually.
d4cfbd3 is described below

commit d4cfbd3631a0b277b5228224e1855c7229062c21
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Sun Aug 2 14:48:25 2020 +0200

    CAMEL-15362: FluentProducerTemplate should not keep old state when reused 
on same thread as its a bit cumbersome to have to clear all state manually.
---
 .../org/apache/camel/FluentProducerTemplate.java   |  11 +-
 .../impl/engine/DefaultFluentProducerTemplate.java | 204 ++++++++++++++-------
 .../camel/builder/FluentProducerTemplateTest.java  |  19 ++
 .../ROOT/pages/camel-3x-upgrade-guide-3_5.adoc     |   8 +
 4 files changed, 175 insertions(+), 67 deletions(-)

diff --git 
a/core/camel-api/src/main/java/org/apache/camel/FluentProducerTemplate.java 
b/core/camel-api/src/main/java/org/apache/camel/FluentProducerTemplate.java
index f086e25..e9eaaf3 100644
--- a/core/camel-api/src/main/java/org/apache/camel/FluentProducerTemplate.java
+++ b/core/camel-api/src/main/java/org/apache/camel/FluentProducerTemplate.java
@@ -140,7 +140,10 @@ public interface FluentProducerTemplate extends Service {
 
     /**
      * Remove the body and headers.
+     *
+     * @deprecated the template automatic clears when sending
      */
+    @Deprecated
     FluentProducerTemplate clearAll();
 
     /**
@@ -153,7 +156,10 @@ public interface FluentProducerTemplate extends Service {
 
     /**
      * Remove the headers.
+     *
+     * @deprecated the template automatic clears when sending
      */
+    @Deprecated
     FluentProducerTemplate clearHeaders();
 
     /**
@@ -173,7 +179,10 @@ public interface FluentProducerTemplate extends Service {
 
     /**
      * Remove the body.
+     *
+     * @deprecated the template automatic clears when sending
      */
+    @Deprecated
     FluentProducerTemplate clearBody();
 
     /**
@@ -236,7 +245,7 @@ public interface FluentProducerTemplate extends Service {
      *     .request()}
      * </pre>
      *
-     * @param processor
+     * @param processor the processor
      */
     FluentProducerTemplate withProcessor(Processor processor);
 
diff --git 
a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultFluentProducerTemplate.java
 
b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultFluentProducerTemplate.java
index 312593a..76d12b4 100644
--- 
a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultFluentProducerTemplate.java
+++ 
b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultFluentProducerTemplate.java
@@ -41,12 +41,12 @@ import org.apache.camel.util.ObjectHelper;
 public class DefaultFluentProducerTemplate extends ServiceSupport implements 
FluentProducerTemplate {
 
     // transient state of endpoint, headers and body which needs to be thread 
local scoped to be thread-safe
-    private final ThreadLocal<Map<String, Object>> headers = new 
ThreadLocal<>();
-    private final ThreadLocal<Object> body = new ThreadLocal<>();
-    private final ThreadLocal<Endpoint> endpoint = new ThreadLocal<>();
-    private final ThreadLocal<Supplier<Exchange>> exchangeSupplier = new 
ThreadLocal<>();
-    private final ThreadLocal<Supplier<Processor>> processorSupplier = new 
ThreadLocal<>();
-    private final ThreadLocal<Consumer<ProducerTemplate>> templateCustomizer = 
new ThreadLocal<>();
+    private Map<String, Object> headers;
+    private Object body;
+    private Endpoint endpoint;
+    private Supplier<Exchange> exchangeSupplier;
+    private Supplier<Processor> processorSupplier;
+    private Consumer<ProducerTemplate> templateCustomizer;
 
     private final CamelContext context;
     private final ClassValue<ConvertBodyProcessor> resultProcessors;
@@ -54,6 +54,7 @@ public class DefaultFluentProducerTemplate extends 
ServiceSupport implements Flu
     private int maximumCacheSize;
     private boolean eventNotifierEnabled;
     private volatile ProducerTemplate template;
+    private volatile boolean cloned;
 
     public DefaultFluentProducerTemplate(CamelContext context) {
         this.context = context;
@@ -66,6 +67,29 @@ public class DefaultFluentProducerTemplate extends 
ServiceSupport implements Flu
         };
     }
 
+    private DefaultFluentProducerTemplate(CamelContext context, 
ClassValue<ConvertBodyProcessor> resultProcessors,
+                                          Endpoint defaultEndpoint, int 
maximumCacheSize, boolean eventNotifierEnabled, ProducerTemplate template) {
+        this.context = context;
+        this.resultProcessors = resultProcessors;
+        this.defaultEndpoint = defaultEndpoint;
+        this.maximumCacheSize = maximumCacheSize;
+        this.eventNotifierEnabled = eventNotifierEnabled;
+        this.template = template;
+        this.cloned = true;
+    }
+
+    private DefaultFluentProducerTemplate newClone() {
+        return new DefaultFluentProducerTemplate(context, resultProcessors, 
defaultEndpoint, maximumCacheSize, eventNotifierEnabled, template);
+    }
+
+    private DefaultFluentProducerTemplate checkCloned() {
+        if (!cloned) {
+            return newClone();
+        } else {
+            return this;
+        }
+    }
+
     @Override
     public CamelContext getCamelContext() {
         return context;
@@ -98,6 +122,9 @@ public class DefaultFluentProducerTemplate extends 
ServiceSupport implements Flu
 
     @Override
     public void setDefaultEndpoint(Endpoint defaultEndpoint) {
+        if (this.defaultEndpoint != null && isStarted()) {
+            throw new IllegalArgumentException("Not allowed after template has 
been started");
+        }
         this.defaultEndpoint = defaultEndpoint;
     }
 
@@ -108,6 +135,9 @@ public class DefaultFluentProducerTemplate extends 
ServiceSupport implements Flu
 
     @Override
     public void setMaximumCacheSize(int maximumCacheSize) {
+        if (this.maximumCacheSize != 0 && isStarted()) {
+            throw new IllegalArgumentException("Not allowed after template has 
been started");
+        }
         this.maximumCacheSize = maximumCacheSize;
     }
 
@@ -118,6 +148,9 @@ public class DefaultFluentProducerTemplate extends 
ServiceSupport implements Flu
 
     @Override
     public void setEventNotifierEnabled(boolean eventNotifierEnabled) {
+        if (isStarted()) {
+            throw new IllegalArgumentException("Not allowed after template has 
been started");
+        }
         this.eventNotifierEnabled = eventNotifierEnabled;
     }
 
@@ -131,52 +164,60 @@ public class DefaultFluentProducerTemplate extends 
ServiceSupport implements Flu
 
     @Override
     public FluentProducerTemplate withHeader(String key, Object value) {
-        Map<String, Object> map = headers.get();
+        DefaultFluentProducerTemplate clone = checkCloned();
+
+        Map<String, Object> map = clone.headers;
         if (map == null) {
             map = new LinkedHashMap<>();
-            headers.set(map);
+            clone.headers = map;
         }
-
         map.put(key, value);
-
-        return this;
+        return clone;
     }
 
     @Override
     public FluentProducerTemplate clearHeaders() {
-        headers.remove();
+        DefaultFluentProducerTemplate clone = checkCloned();
 
-        return this;
+        if (clone.headers != null) {
+            clone.headers.clear();
+        }
+        return clone;
     }
 
     @Override
     public FluentProducerTemplate withBody(Object body) {
-        this.body.set(body);
+        DefaultFluentProducerTemplate clone = checkCloned();
 
-        return this;
+        clone.body = body;
+        return clone;
     }
 
     @Override
     public FluentProducerTemplate withBodyAs(Object body, Class<?> type) {
+        DefaultFluentProducerTemplate clone = checkCloned();
+
         Object b = type != null
-            ? context.getTypeConverter().convertTo(type, body)
+            ? clone.context.getTypeConverter().convertTo(type, body)
             : body;
-
-        this.body.set(b);
-
-        return this;
+        clone.body = b;
+        return clone;
     }
 
     @Override
     public FluentProducerTemplate clearBody() {
-        body.remove();
+        DefaultFluentProducerTemplate clone = checkCloned();
 
-        return this;
+        clone.body = null;
+        return clone;
     }
 
     @Override
     public FluentProducerTemplate withTemplateCustomizer(final 
Consumer<ProducerTemplate> templateCustomizer) {
-        this.templateCustomizer.set(templateCustomizer);
+        if (this.templateCustomizer != null && isStarted()) {
+            throw new IllegalArgumentException("Not allowed after template has 
been started");
+        }
+        this.templateCustomizer = templateCustomizer;
 
         if (template != null) {
             // need to re-initialize template since we have a customizer
@@ -195,8 +236,10 @@ public class DefaultFluentProducerTemplate extends 
ServiceSupport implements Flu
 
     @Override
     public FluentProducerTemplate withExchange(final Supplier<Exchange> 
exchangeSupplier) {
-        this.exchangeSupplier.set(exchangeSupplier);
-        return this;
+        DefaultFluentProducerTemplate clone = checkCloned();
+
+        clone.exchangeSupplier = exchangeSupplier;
+        return clone;
     }
 
     @Override
@@ -206,8 +249,10 @@ public class DefaultFluentProducerTemplate extends 
ServiceSupport implements Flu
 
     @Override
     public FluentProducerTemplate withProcessor(final Supplier<Processor> 
processorSupplier) {
-        this.processorSupplier.set(processorSupplier);
-        return this;
+        DefaultFluentProducerTemplate clone = checkCloned();
+
+        clone.processorSupplier = processorSupplier;
+        return clone;
     }
 
     @Override
@@ -217,8 +262,10 @@ public class DefaultFluentProducerTemplate extends 
ServiceSupport implements Flu
 
     @Override
     public FluentProducerTemplate to(Endpoint endpoint) {
-        this.endpoint.set(endpoint);
-        return this;
+        DefaultFluentProducerTemplate clone = checkCloned();
+
+        clone.endpoint = endpoint;
+        return clone;
     }
 
     // ************************
@@ -233,31 +280,37 @@ public class DefaultFluentProducerTemplate extends 
ServiceSupport implements Flu
     @Override
     @SuppressWarnings("unchecked")
     public <T> T request(Class<T> type) throws CamelExecutionException {
-        if (exchangeSupplier.get() != null) {
+        if (exchangeSupplier != null && exchangeSupplier.get() != null) {
             throw new IllegalArgumentException("withExchange not supported on 
FluentProducerTemplate.request method. Use send method instead.");
         }
 
+        DefaultFluentProducerTemplate clone = checkCloned();
+
         // Determine the target endpoint
-        final Endpoint target = target();
+        final Endpoint target = clone.target();
 
         // Create the default processor if not provided.
-        final Processor processorSupplier = this.processorSupplier.get() != 
null ? this.processorSupplier.get().get() : defaultProcessor();
+        Processor processor = clone.processorSupplier != null ? 
clone.processorSupplier.get() : null;
+        final Processor processorSupplier = processor != null ? processor : 
clone.defaultProcessor();
+
+        // reset cloned flag so when we use it again it has to set values again
+        cloned = false;
 
         T result;
         if (type == Exchange.class) {
-            result = (T)template().request(target, processorSupplier);
+            result = (T)clone.template().request(target, processorSupplier);
         } else if (type == Message.class) {
-            Exchange exchange = template().request(target, processorSupplier);
+            Exchange exchange = clone.template().request(target, 
processorSupplier);
             result = (T)exchange.getMessage();
         } else {
-            Exchange exchange = template().send(
+            Exchange exchange = clone.template().send(
                 target,
                 ExchangePattern.InOut,
                 processorSupplier,
-                resultProcessors.get(type)
+                clone.resultProcessors.get(type)
             );
 
-            result = context.getTypeConverter().convertTo(
+            result = clone.context.getTypeConverter().convertTo(
                 type,
                 ExchangeHelper.extractResultBody(exchange, 
exchange.getPattern())
             );
@@ -273,23 +326,28 @@ public class DefaultFluentProducerTemplate extends 
ServiceSupport implements Flu
 
     @Override
     public <T> Future<T> asyncRequest(Class<T> type) {
+        DefaultFluentProducerTemplate clone = checkCloned();
+
         // Determine the target endpoint
-        final Endpoint target = target();
+        final Endpoint target = clone.target();
+
+        // reset cloned flag so when we use it again it has to set values again
+        cloned = false;
 
         Future<T> result;
-        if (ObjectHelper.isNotEmpty(headers.get())) {
+        if (ObjectHelper.isNotEmpty(clone.headers)) {
             // Make a copy of the headers and body so that async processing 
won't
             // be invalidated by subsequent reuse of the template
-            final Map<String, Object> headersCopy = new 
HashMap<>(headers.get());
-            final Object bodyCopy = body.get();
+            final Map<String, Object> headersCopy = new 
HashMap<>(clone.headers);
+            final Object bodyCopy = clone.body;
 
-            result = template().asyncRequestBodyAndHeaders(target, bodyCopy, 
headersCopy, type);
+            result = clone.template().asyncRequestBodyAndHeaders(target, 
bodyCopy, headersCopy, type);
         } else {
             // Make a copy of the and body so that async processing won't be
             // invalidated by subsequent reuse of the template
-            final Object bodyCopy = body.get();
+            final Object bodyCopy = clone.body;
 
-            result = template().asyncRequestBody(target, bodyCopy, type);
+            result = clone.template().asyncRequestBody(target, bodyCopy, type);
         }
 
         return result;
@@ -301,29 +359,41 @@ public class DefaultFluentProducerTemplate extends 
ServiceSupport implements Flu
 
     @Override
     public Exchange send() throws CamelExecutionException {
+        DefaultFluentProducerTemplate clone = checkCloned();
+
         // Determine the target endpoint
-        final Endpoint target = target();
+        final Endpoint target = clone.target();
+
+        // reset cloned flag so when we use it again it has to set values again
+        cloned = false;
 
-        Exchange exchange = exchangeSupplier.get() != null ? 
exchangeSupplier.get().get() : null;
+        Exchange exchange = clone.exchangeSupplier != null ? 
clone.exchangeSupplier.get() : null;
         if (exchange != null) {
-            return template().send(target, exchange);
+            return clone.template().send(target, exchange);
         } else {
-            Processor processor = processorSupplier.get() != null ? 
processorSupplier.get().get() : defaultProcessor();
-            return template().send(target, processor);
+            Processor proc = clone.processorSupplier != null ? 
clone.processorSupplier.get() : null;
+            final Processor processor = proc != null ? proc : 
clone.defaultProcessor();
+            return clone.template().send(target, processor);
         }
     }
 
     @Override
     public Future<Exchange> asyncSend() {
+        DefaultFluentProducerTemplate clone = checkCloned();
+
         // Determine the target endpoint
-        final Endpoint target = target();
+        final Endpoint target = clone.target();
 
-        Exchange exchange = exchangeSupplier.get() != null ? 
exchangeSupplier.get().get() : null;
+        // reset cloned flag so when we use it again it has to set values again
+        cloned = false;
+
+        Exchange exchange = clone.exchangeSupplier != null ? 
clone.exchangeSupplier.get() : null;
         if (exchange != null) {
-            return template().asyncSend(target, exchange);
+            return clone.template().asyncSend(target, exchange);
         } else {
-            Processor processor = processorSupplier.get() != null ? 
processorSupplier.get().get() : defaultAsyncProcessor();
-            return template().asyncSend(target, processor);
+            Processor proc = clone.processorSupplier != null ? 
clone.processorSupplier.get() : null;
+            final Processor processor = proc != null ? proc : 
clone.defaultAsyncProcessor();
+            return clone.template().asyncSend(target, processor);
         }
     }
 
@@ -339,6 +409,8 @@ public class DefaultFluentProducerTemplate extends 
ServiceSupport implements Flu
     public static FluentProducerTemplate on(CamelContext context) {
         DefaultFluentProducerTemplate fluent = new 
DefaultFluentProducerTemplate(context);
         fluent.start();
+        // mark it as cloned as its started
+        fluent.cloned = true;
         return fluent;
     }
 
@@ -348,14 +420,14 @@ public class DefaultFluentProducerTemplate extends 
ServiceSupport implements Flu
 
     private Processor defaultProcessor() {
         return exchange -> {
-            ObjectHelper.ifNotEmpty(headers.get(), 
exchange.getIn().getHeaders()::putAll);
-            ObjectHelper.ifNotEmpty(body.get(), exchange.getIn()::setBody);
+            ObjectHelper.ifNotEmpty(headers, 
exchange.getIn().getHeaders()::putAll);
+            ObjectHelper.ifNotEmpty(body, exchange.getIn()::setBody);
         };
     }
 
     private Processor defaultAsyncProcessor() {
-        final Map<String, Object> headersCopy = 
ObjectHelper.isNotEmpty(this.headers.get()) ? new HashMap<>(this.headers.get()) 
: null;
-        final Object bodyCopy = this.body.get();
+        final Map<String, Object> headersCopy = 
ObjectHelper.isNotEmpty(this.headers) ? new HashMap<>(this.headers) : null;
+        final Object bodyCopy = this.body;
 
         return exchange -> {
             ObjectHelper.ifNotEmpty(headersCopy, 
exchange.getIn().getHeaders()::putAll);
@@ -364,8 +436,8 @@ public class DefaultFluentProducerTemplate extends 
ServiceSupport implements Flu
     }
 
     private Endpoint target() {
-        if (endpoint.get() != null) {
-            return endpoint.get();
+        if (endpoint != null) {
+            return endpoint;
         }
         if (defaultEndpoint != null) {
             return defaultEndpoint;
@@ -386,8 +458,8 @@ public class DefaultFluentProducerTemplate extends 
ServiceSupport implements Flu
             template.setDefaultEndpoint(defaultEndpoint);
         }
         template.setEventNotifierEnabled(eventNotifierEnabled);
-        if (templateCustomizer.get() != null) {
-            templateCustomizer.get().accept(template);
+        if (templateCustomizer != null) {
+            templateCustomizer.accept(template);
         }
         ServiceHelper.initService(template);
     }
@@ -400,10 +472,10 @@ public class DefaultFluentProducerTemplate extends 
ServiceSupport implements Flu
     @Override
     protected void doStop() throws Exception {
         clearAll();
-        this.endpoint.remove();
-        this.exchangeSupplier.remove();
-        this.processorSupplier.remove();
-        this.templateCustomizer.remove();
+        this.endpoint = null;
+        this.exchangeSupplier = null;
+        this.processorSupplier = null;
+        this.templateCustomizer = null;
         ServiceHelper.stopService(template);
     }
 
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/builder/FluentProducerTemplateTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/builder/FluentProducerTemplateTest.java
index 95cf165..5da24d7 100644
--- 
a/core/camel-core/src/test/java/org/apache/camel/builder/FluentProducerTemplateTest.java
+++ 
b/core/camel-core/src/test/java/org/apache/camel/builder/FluentProducerTemplateTest.java
@@ -345,6 +345,22 @@ public class FluentProducerTemplateTest extends 
ContextTestSupport {
         assertMockEndpointsSatisfied();
     }
 
+    @Test
+    public void testUseTwoTimesSameThread() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:echo");
+        mock.expectedBodiesReceived("Camel", "World");
+        mock.message(0).header("foo").isEqualTo("!");
+        mock.message(1).header("foo").isNull();
+
+        FluentProducerTemplate fluent = context.createFluentProducerTemplate();
+        Object result = fluent.withBody("Camel").withHeader("foo", 
"!").to("direct:echo").request();
+        Object result2 = fluent.withBody("World").to("direct:echo").request();
+        assertEquals("CamelCamel!", result);
+        assertEquals("WorldWorld", result2);
+
+        assertMockEndpointsSatisfied();
+    }
+
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
@@ -370,6 +386,9 @@ public class FluentProducerTemplateTest extends 
ContextTestSupport {
                 from("direct:inout").transform(constant(123));
 
                 from("direct:async").to("mock:async");
+
+                
from("direct:echo").to("mock:echo").setBody().simple("${body}${body}${header.foo}");
+
             }
         };
     }
diff --git 
a/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_5.adoc 
b/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_5.adoc
index a55d143..c7a6795 100644
--- a/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_5.adoc
+++ b/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_5.adoc
@@ -6,6 +6,14 @@ from both 3.0 to 3.1 and 3.1 to 3.2.
 
 == Upgrading Camel 3.4 to 3.5
 
+=== FluentProducerTemplate
+
+The template will now automatic clear its state send sending the message, this 
avoids end users having to call `clearAll` after usage,
+in case the template should be reused to send other messages.
+
+After the template has been started / used the first time, then its general 
configuration cannot be altered later,
+instead create a new template.
+
 === camel-bean
 
 The `bean(class)` EIP will now lookup in the registry first whether there is a 
single bean instance of the given class type

Reply via email to