This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new ce44147 CAMEL-12968: Fix FluentProducerTemplate to be thread-safe when you also set endpoints and other states. ce44147 is described below commit ce441479b98eedd4371d011c5230fc5b1acd0e79 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Thu Aug 1 11:01:47 2019 +0200 CAMEL-12968: Fix FluentProducerTemplate to be thread-safe when you also set endpoints and other states. --- .../impl/engine/DefaultFluentProducerTemplate.java | 82 +++++++++++++--------- 1 file changed, 47 insertions(+), 35 deletions(-) diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultFluentProducerTemplate.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultFluentProducerTemplate.java index 0d8c579..9a19e65 100644 --- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultFluentProducerTemplate.java +++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultFluentProducerTemplate.java @@ -17,8 +17,8 @@ package org.apache.camel.impl.engine; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.Map; -import java.util.Optional; import java.util.concurrent.Future; import java.util.function.Consumer; import java.util.function.Supplier; @@ -40,29 +40,24 @@ import org.apache.camel.util.ObjectHelper; public class DefaultFluentProducerTemplate extends ServiceSupport implements FluentProducerTemplate { - // transient state of headers and body which needs to be thread local scoped to be thread-safe + // transient state of endpoint, headers and body which needs to be thread local scoped to be thread-safe private final ThreadLocal<Map<String, Object>> headers = new ThreadLocal<>(); private final ThreadLocal<Object> body = new ThreadLocal<>(); + private final ThreadLocal<Endpoint> endpoint = new ThreadLocal<>(); + private final ThreadLocal<Supplier<Exchange>> exchangeSupplier = new ThreadLocal<>(); + private final ThreadLocal<Supplier<Processor>> processorSupplier = new ThreadLocal<>(); + private final ThreadLocal<Consumer<ProducerTemplate>> templateCustomizer = new ThreadLocal<>(); private final CamelContext context; private final ClassValue<ConvertBodyProcessor> resultProcessors; - private Optional<Consumer<ProducerTemplate>> templateCustomizer; - private Optional<Supplier<Exchange>> exchangeSupplier; - private Optional<Supplier<Processor>> processorSupplier; - private Optional<Endpoint> endpoint; - private Optional<Endpoint> defaultEndpoint; + private Endpoint defaultEndpoint; private int maximumCacheSize; private boolean eventNotifierEnabled; private volatile ProducerTemplate template; public DefaultFluentProducerTemplate(CamelContext context) { this.context = context; - this.endpoint = Optional.empty(); - this.defaultEndpoint = Optional.empty(); this.eventNotifierEnabled = true; - this.templateCustomizer = Optional.empty(); - this.exchangeSupplier = Optional.empty(); - this.processorSupplier = Optional.empty(); this.resultProcessors = new ClassValue<ConvertBodyProcessor>() { @Override protected ConvertBodyProcessor computeValue(Class<?> type) { @@ -98,12 +93,12 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu @Override public Endpoint getDefaultEndpoint() { - return defaultEndpoint.orElse(null); + return defaultEndpoint; } @Override public void setDefaultEndpoint(Endpoint defaultEndpoint) { - this.defaultEndpoint = Optional.ofNullable(defaultEndpoint); + this.defaultEndpoint = defaultEndpoint; } @Override @@ -138,7 +133,7 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu public FluentProducerTemplate withHeader(String key, Object value) { Map<String, Object> map = headers.get(); if (map == null) { - map = new HashMap<>(); + map = new LinkedHashMap<>(); headers.set(map); } @@ -181,7 +176,7 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu @Override public FluentProducerTemplate withTemplateCustomizer(final Consumer<ProducerTemplate> templateCustomizer) { - this.templateCustomizer = Optional.of(templateCustomizer); + this.templateCustomizer.set(templateCustomizer); return this; } @@ -192,7 +187,7 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu @Override public FluentProducerTemplate withExchange(final Supplier<Exchange> exchangeSupplier) { - this.exchangeSupplier = Optional.of(exchangeSupplier); + this.exchangeSupplier.set(exchangeSupplier); return this; } @@ -203,7 +198,7 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu @Override public FluentProducerTemplate withProcessor(final Supplier<Processor> processorSupplier) { - this.processorSupplier = Optional.of(processorSupplier); + this.processorSupplier.set(processorSupplier); return this; } @@ -214,7 +209,7 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu @Override public FluentProducerTemplate to(Endpoint endpoint) { - this.endpoint = Optional.of(endpoint); + this.endpoint.set(endpoint); return this; } @@ -230,7 +225,7 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu @Override @SuppressWarnings("unchecked") public <T> T request(Class<T> type) throws CamelExecutionException { - if (exchangeSupplier.isPresent()) { + if (exchangeSupplier.get() != null) { throw new IllegalArgumentException("withExchange not supported on FluentProducerTemplate.request method. Use send method instead."); } @@ -238,19 +233,19 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu final Endpoint target = target(); // Create the default processor if not provided. - final Supplier<Processor> processorSupplier = this.processorSupplier.orElse(() -> defaultProcessor()); + final Processor processorSupplier = this.processorSupplier.get() != null ? this.processorSupplier.get().get() : defaultProcessor(); T result; if (type == Exchange.class) { - result = (T)template().request(target, processorSupplier.get()); + result = (T)template().request(target, processorSupplier); } else if (type == Message.class) { - Exchange exchange = template().request(target, processorSupplier.get()); + Exchange exchange = template().request(target, processorSupplier); result = (T)exchange.getMessage(); } else { Exchange exchange = template().send( target, ExchangePattern.InOut, - processorSupplier.get(), + processorSupplier, resultProcessors.get(type) ); @@ -301,9 +296,13 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu // Determine the target endpoint final Endpoint target = target(); - return exchangeSupplier.isPresent() - ? template().send(target, exchangeSupplier.get().get()) - : template().send(target, processorSupplier.orElse(() -> defaultProcessor()).get()); + Exchange exchange = exchangeSupplier.get() != null ? exchangeSupplier.get().get() : null; + if (exchange != null) { + return template().send(target, exchange); + } else { + Processor processor = processorSupplier.get() != null ? processorSupplier.get().get() : defaultProcessor(); + return template().send(target, processor); + } } @Override @@ -311,9 +310,13 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu // Determine the target endpoint final Endpoint target = target(); - return exchangeSupplier.isPresent() - ? template().asyncSend(target, exchangeSupplier.get().get()) - : template().asyncSend(target, processorSupplier.orElse(() -> defaultAsyncProcessor()).get()); + Exchange exchange = exchangeSupplier.get() != null ? exchangeSupplier.get().get() : null; + if (exchange != null) { + return template().asyncSend(target, exchange); + } else { + Processor processor = processorSupplier.get() != null ? processorSupplier.get().get() : defaultAsyncProcessor(); + return template().asyncSend(target, processor); + } } // ************************ @@ -334,9 +337,13 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu if (template == null) { template = context.createProducerTemplate(maximumCacheSize); - defaultEndpoint.ifPresent(template::setDefaultEndpoint); + if (defaultEndpoint != null) { + template.setDefaultEndpoint(defaultEndpoint); + } template.setEventNotifierEnabled(eventNotifierEnabled); - templateCustomizer.ifPresent(tc -> tc.accept(template)); + if (templateCustomizer.get() != null) { + templateCustomizer.get().accept(template); + } } return template; @@ -360,11 +367,11 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu } private Endpoint target() { - if (endpoint.isPresent()) { + if (endpoint.get() != null) { return endpoint.get(); } - if (defaultEndpoint.isPresent()) { - return defaultEndpoint.get(); + if (defaultEndpoint != null) { + return defaultEndpoint; } throw new IllegalArgumentException("No endpoint configured on FluentProducerTemplate. You can configure an endpoint with to(uri)"); @@ -381,6 +388,11 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu @Override protected void doStop() throws Exception { clearAll(); + this.endpoint.remove(); + this.exchangeSupplier.remove(); + this.processorSupplier.remove(); + this.templateCustomizer.remove(); + ServiceHelper.stopService(template); } }