CAMEL-10208: Make FluentProducerTemplate similar to ProducerTemplate to get from CamelContext and use
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/687adda1 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/687adda1 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/687adda1 Branch: refs/heads/master Commit: 687adda1501edadf55375c4be16ac1bccdb82021 Parents: cbe9069 Author: Claus Ibsen <davscl...@apache.org> Authored: Mon Aug 1 13:35:25 2016 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Mon Aug 1 13:35:25 2016 +0200 ---------------------------------------------------------------------- .../java/org/apache/camel/CamelContext.java | 34 ++ .../apache/camel/FluentProducerTemplate.java | 266 ++++++++++++++ .../builder/DefaultFluentProducerTemplate.java | 334 +++++++++++++++++ .../camel/builder/FluentProducerTemplate.java | 361 ------------------- .../apache/camel/impl/DefaultCamelContext.java | 19 + .../builder/FluentProducerTemplateTest.java | 45 ++- .../jsonpath/JsonPathWithSimpleCBRTest.java | 10 +- .../camel/test/junit4/CamelTestSupport.java | 20 +- .../test/patterns/FilterFluentTemplateTest.java | 74 ++++ 9 files changed, 777 insertions(+), 386 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/687adda1/camel-core/src/main/java/org/apache/camel/CamelContext.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/CamelContext.java b/camel-core/src/main/java/org/apache/camel/CamelContext.java index e5d13fd..c4d6943 100644 --- a/camel-core/src/main/java/org/apache/camel/CamelContext.java +++ b/camel-core/src/main/java/org/apache/camel/CamelContext.java @@ -1070,6 +1070,40 @@ public interface CamelContext extends SuspendableService, RuntimeConfiguration { ProducerTemplate createProducerTemplate(int maximumCacheSize); /** + * Creates a new {@link FluentProducerTemplate} which is <b>started</b> and therefore ready to use right away. + * <p/> + * See this FAQ before use: <a href="http://camel.apache.org/why-does-camel-use-too-many-threads-with-producertemplate.html"> + * Why does Camel use too many threads with ProducerTemplate?</a> + * <p/> + * <b>Important:</b> Make sure to call {@link org.apache.camel.ProducerTemplate#stop()} when you are done using the template, + * to clean up any resources. + * <p/> + * Will use cache size defined in Camel property with key {@link Exchange#MAXIMUM_CACHE_POOL_SIZE}. + * If no key was defined then it will fallback to a default size of 1000. + * You can also use the {@link org.apache.camel.FluentProducerTemplate#setMaximumCacheSize(int)} method to use a custom value + * before starting the template. + * + * @return the template + * @throws RuntimeCamelException is thrown if error starting the template + */ + FluentProducerTemplate createFluentProducerTemplate(); + + /** + * Creates a new {@link FluentProducerTemplate} which is <b>started</b> and therefore ready to use right away. + * <p/> + * See this FAQ before use: <a href="http://camel.apache.org/why-does-camel-use-too-many-threads-with-producertemplate.html"> + * Why does Camel use too many threads with ProducerTemplate?</a> + * <p/> + * <b>Important:</b> Make sure to call {@link FluentProducerTemplate#stop()} when you are done using the template, + * to clean up any resources. + * + * @param maximumCacheSize the maximum cache size + * @return the template + * @throws RuntimeCamelException is thrown if error starting the template + */ + FluentProducerTemplate createFluentProducerTemplate(int maximumCacheSize); + + /** * Creates a new {@link ConsumerTemplate} which is <b>started</b> and therefore ready to use right away. * <p/> * See this FAQ before use: <a href="http://camel.apache.org/why-does-camel-use-too-many-threads-with-producertemplate.html"> http://git-wip-us.apache.org/repos/asf/camel/blob/687adda1/camel-core/src/main/java/org/apache/camel/FluentProducerTemplate.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/FluentProducerTemplate.java b/camel-core/src/main/java/org/apache/camel/FluentProducerTemplate.java new file mode 100644 index 0000000..c50fbe7 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/FluentProducerTemplate.java @@ -0,0 +1,266 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel; + +import java.util.concurrent.Future; +import java.util.function.*; + +public interface FluentProducerTemplate extends Service { + + /** + * Get the {@link CamelContext} + * + * @return camelContext the Camel context + */ + CamelContext getCamelContext(); + + // Configuration methods + // ----------------------------------------------------------------------- + + /** + * Gets the maximum cache size used in the backing cache pools. + * + * @return the maximum cache size + */ + int getMaximumCacheSize(); + + /** + * Sets a custom maximum cache size to use in the backing cache pools. + * + * @param maximumCacheSize the custom maximum cache size + */ + void setMaximumCacheSize(int maximumCacheSize); + + /** + * Gets an approximated size of the current cached resources in the backing cache pools. + * + * @return the size of current cached resources + */ + int getCurrentCacheSize(); + + /** + * Get the default endpoint to use if none is specified + * + * @return the default endpoint instance + */ + Endpoint getDefaultEndpoint(); + + /** + * Sets the default endpoint to use if none is specified + * + * @param defaultEndpoint the default endpoint instance + */ + void setDefaultEndpoint(Endpoint defaultEndpoint); + + /** + * Sets the default endpoint uri to use if none is specified + * + * @param endpointUri the default endpoint uri + */ + void setDefaultEndpointUri(String endpointUri); + + /** + * Sets whether the {@link org.apache.camel.spi.EventNotifier} should be + * used by this {@link ProducerTemplate} to send events about the {@link Exchange} + * being sent. + * <p/> + * By default this is enabled. + * + * @param enabled <tt>true</tt> to enable, <tt>false</tt> to disable. + */ + void setEventNotifierEnabled(boolean enabled); + + /** + * Whether the {@link org.apache.camel.spi.EventNotifier} should be + * used by this {@link ProducerTemplate} to send events about the {@link Exchange} + * being sent. + * + * @return <tt>true</tt> if enabled, <tt>false</tt> otherwise + */ + boolean isEventNotifierEnabled(); + + /** + * Cleanup the cache (purging stale entries) + */ + void cleanUp(); + + // Fluent methods + // ----------------------------------------------------------------------- + + /** + * Set the header + * + * @param key the key of the header + * @param value the value of the header + */ + FluentProducerTemplate withHeader(String key, Object value); + + /** + * Remove the headers. + */ + FluentProducerTemplate clearHeaders(); + + /** + * Set the message body + * + * @param body the body + */ + FluentProducerTemplate withBody(Object body); + + /** + * Set the message body after converting it to the given type + * + * @param body the body + * @param type the type which the body should be converted to + */ + FluentProducerTemplate withBodyAs(Object body, Class<?> type); + + /** + * Remove the body. + */ + FluentProducerTemplate clearBody(); + + /** + * To customize the producer template for advanced usage like to set the + * executor service to use. + * + * <pre> + * {@code + * FluentProducerTemplate.on(context) + * .withTemplateCustomizer( + * template -> { + * template.setExecutorService(myExecutor); + * template.setMaximumCacheSize(10); + * } + * ) + * .withBody("the body") + * .to("direct:start") + * .request() + * </pre> + * + * Note that it is invoked only once. + * + * @param templateCustomizer the customizer + */ + FluentProducerTemplate withTemplateCustomizer(java.util.function.Consumer<ProducerTemplate> templateCustomizer); + + /** + * Set the exchange to use for send. + * + * @param exchange the exchange + */ + FluentProducerTemplate withExchange(Exchange exchange); + + /** + * Set the exchangeSupplier which will be invoke to get the exchange to be + * used for send. + * + * @param exchangeSupplier the supplier + */ + FluentProducerTemplate withExchange(Supplier<Exchange> exchangeSupplier); + + /** + * Set the processor to use for send/request. + * + * <pre> + * {@code + * FluentProducerTemplate.on(context) + * .withProcessor( + * exchange -> { + * exchange.getIn().setHeader("Key1", "Val1") + * exchange.getIn().setHeader("Key2", "Val2") + * exchange.getIn().setBody("the body") + * } + * ) + * .to("direct:start") + * .request() + * </pre> + * + * @param processor + * @return + */ + FluentProducerTemplate withProcessor(Processor processor); + + /** + * Set the processorSupplier which will be invoke to get the processor to be + * used for send/request. + * + * @param processorSupplier the supplier + */ + FluentProducerTemplate withProcessor(Supplier<Processor> processorSupplier); + + /** + * Set the message body + * + * @param endpointUri the endpoint URI to send to + */ + FluentProducerTemplate to(String endpointUri); + + /** + * Set the message body + * + * @param endpoint the endpoint to send to + */ + FluentProducerTemplate to(Endpoint endpoint); + + /** + * Send to an endpoint returning any result output body. + * + * @return the result + * @throws CamelExecutionException is thrown if error occurred + */ + Object request() throws CamelExecutionException; + + /** + * Send to an endpoint. + * + * @param type the expected response type + * @return the result + * @throws CamelExecutionException is thrown if error occurred + */ + @SuppressWarnings("unchecked") + <T> T request(Class<T> type) throws CamelExecutionException; + + /** + * Sends asynchronously to the given endpoint. + * + * @return a handle to be used to get the response in the future + */ + Future<Object> asyncRequest(); + + /** + * Sends asynchronously to the given endpoint. + * + * @param type the expected response type + * @return a handle to be used to get the response in the future + */ + <T> Future<T> asyncRequest(Class<T> type); + + /** + * Send to an endpoint + * + * @throws CamelExecutionException is thrown if error occurred + */ + Exchange send() throws CamelExecutionException; + + /** + * Sends asynchronously to the given endpoint. + * + * @return a handle to be used to get the response in the future + */ + Future<Exchange> asyncSend(); +} http://git-wip-us.apache.org/repos/asf/camel/blob/687adda1/camel-core/src/main/java/org/apache/camel/builder/DefaultFluentProducerTemplate.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/builder/DefaultFluentProducerTemplate.java b/camel-core/src/main/java/org/apache/camel/builder/DefaultFluentProducerTemplate.java new file mode 100644 index 0000000..283e991 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/builder/DefaultFluentProducerTemplate.java @@ -0,0 +1,334 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.builder; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Future; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import org.apache.camel.CamelContext; +import org.apache.camel.CamelExecutionException; +import org.apache.camel.Endpoint; +import org.apache.camel.Exchange; +import org.apache.camel.ExchangePattern; +import org.apache.camel.FluentProducerTemplate; +import org.apache.camel.Message; +import org.apache.camel.Processor; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.processor.ConvertBodyProcessor; +import org.apache.camel.support.ServiceSupport; +import org.apache.camel.util.ExchangeHelper; +import org.apache.camel.util.ObjectHelper; +import org.apache.camel.util.ServiceHelper; + +public class DefaultFluentProducerTemplate extends ServiceSupport implements FluentProducerTemplate { + private final CamelContext context; + private final ClassValue<ConvertBodyProcessor> resultProcessors; + private Map<String, Object> headers; + private Object body; + private Endpoint endpoint; + private Consumer<ProducerTemplate> templateCustomizer; + private Supplier<Exchange> exchangeSupplier; + private Supplier<Processor> processorSupplier; + private volatile ProducerTemplate template; + private Endpoint defaultEndpoint; + private int maximumCacheSize; + private boolean eventNotifierEnabled = true; + + public DefaultFluentProducerTemplate(CamelContext context) { + this.context = context; + this.headers = null; + this.body = null; + this.endpoint = null; + this.templateCustomizer = null; + this.exchangeSupplier = null; + this.processorSupplier = () -> this::populateExchange; + this.template = null; + this.resultProcessors = new ClassValue<ConvertBodyProcessor>() { + @Override + protected ConvertBodyProcessor computeValue(Class<?> type) { + return new ConvertBodyProcessor(type); + } + }; + } + + @Override + public CamelContext getCamelContext() { + return context; + } + + @Override + public int getCurrentCacheSize() { + if (template == null) { + return 0; + } + return template.getCurrentCacheSize(); + } + + @Override + public void cleanUp() { + if (template != null) { + template.cleanUp(); + } + } + + @Override + public void setDefaultEndpointUri(String endpointUri) { + setDefaultEndpoint(getCamelContext().getEndpoint(endpointUri)); + } + + @Override + public Endpoint getDefaultEndpoint() { + return defaultEndpoint; + } + + @Override + public void setDefaultEndpoint(Endpoint defaultEndpoint) { + this.defaultEndpoint = defaultEndpoint; + } + + @Override + public int getMaximumCacheSize() { + return maximumCacheSize; + } + + @Override + public void setMaximumCacheSize(int maximumCacheSize) { + this.maximumCacheSize = maximumCacheSize; + } + + @Override + public boolean isEventNotifierEnabled() { + return eventNotifierEnabled; + } + + @Override + public void setEventNotifierEnabled(boolean eventNotifierEnabled) { + this.eventNotifierEnabled = eventNotifierEnabled; + } + + @Override + public FluentProducerTemplate withHeader(String key, Object value) { + if (headers == null) { + headers = new HashMap<>(); + } + + headers.put(key, value); + + return this; + } + + @Override + public FluentProducerTemplate clearHeaders() { + if (headers != null) { + headers.clear(); + } + + return this; + } + + @Override + public FluentProducerTemplate withBody(Object body) { + this.body = body; + + return this; + } + + @Override + public FluentProducerTemplate withBodyAs(Object body, Class<?> type) { + this.body = type != null + ? context.getTypeConverter().convertTo(type, body) + : body; + + return this; + } + + @Override + public FluentProducerTemplate clearBody() { + this.body = null; + + return this; + } + + @Override + public FluentProducerTemplate withTemplateCustomizer(final Consumer<ProducerTemplate> templateCustomizer) { + this.templateCustomizer = templateCustomizer; + return this; + } + + @Override + public FluentProducerTemplate withExchange(final Exchange exchange) { + return withExchange(() -> exchange); + } + + @Override + public FluentProducerTemplate withExchange(final Supplier<Exchange> exchangeSupplier) { + this.exchangeSupplier = exchangeSupplier; + return this; + } + + @Override + public FluentProducerTemplate withProcessor(final Processor processor) { + return withProcessor(() -> processor); + } + + @Override + public FluentProducerTemplate withProcessor(final Supplier<Processor> processorSupplier) { + this.processorSupplier = processorSupplier; + return this; + } + + @Override + public FluentProducerTemplate to(String endpointUri) { + return to(context.getEndpoint(endpointUri)); + } + + @Override + public FluentProducerTemplate to(Endpoint endpoint) { + this.endpoint = endpoint; + return this; + } + + // ************************ + // REQUEST + // ************************ + + @Override + public Object request() throws CamelExecutionException { + return request(Object.class); + } + + @Override + @SuppressWarnings("unchecked") + public <T> T request(Class<T> type) throws CamelExecutionException { + T result; + Endpoint target = endpoint != null ? endpoint : defaultEndpoint; + + if (type == Exchange.class) { + result = (T)template().request(target, processorSupplier.get()); + } else if (type == Message.class) { + Exchange exchange = template().request(target, processorSupplier.get()); + result = exchange.hasOut() ? (T)exchange.getOut() : (T)exchange.getIn(); + } else { + Exchange exchange = template().send( + target, + ExchangePattern.InOut, + processorSupplier.get(), + resultProcessors.get(type) + ); + + result = context.getTypeConverter().convertTo( + type, + ExchangeHelper.extractResultBody(exchange, exchange.getPattern()) + ); + } + + return result; + } + + @Override + public Future<Object> asyncRequest() { + return asyncRequest(Object.class); + } + + @Override + public <T> Future<T> asyncRequest(Class<T> type) { + Endpoint target = endpoint != null ? endpoint : defaultEndpoint; + Future<T> result; + if (headers != null) { + result = template().asyncRequestBodyAndHeaders(target, body, headers, type); + } else { + result = template().asyncRequestBody(target, body, type); + } + + return result; + } + + // ************************ + // SEND + // ************************ + + @Override + public Exchange send() throws CamelExecutionException { + Endpoint target = endpoint != null ? endpoint : defaultEndpoint; + return exchangeSupplier != null + ? template().send(target, exchangeSupplier.get()) + : template().send(target, processorSupplier.get()); + } + + @Override + public Future<Exchange> asyncSend() { + Endpoint target = endpoint != null ? endpoint : defaultEndpoint; + return exchangeSupplier != null + ? template().asyncSend(target, exchangeSupplier.get()) + : template().asyncSend(target, processorSupplier.get()); + } + + // ************************ + // HELPERS + // ************************ + + /** + * Create the FluentProducerTemplate by setting the camel context + * + * @param context the camel context + */ + public static FluentProducerTemplate on(CamelContext context) { + return new DefaultFluentProducerTemplate(context); + } + + private ProducerTemplate template() { + ObjectHelper.notNull(context, "CamelContext"); + + if (template == null) { + template = maximumCacheSize > 0 ? context.createProducerTemplate(maximumCacheSize) : context.createProducerTemplate(); + if (defaultEndpoint != null) { + template.setDefaultEndpoint(defaultEndpoint); + } + template.setEventNotifierEnabled(eventNotifierEnabled); + if (templateCustomizer != null) { + templateCustomizer.accept(template); + } + } + + return template; + } + + private void populateExchange(Exchange exchange) throws Exception { + if (headers != null && !headers.isEmpty()) { + exchange.getIn().getHeaders().putAll(headers); + } + if (body != null) { + exchange.getIn().setBody(body); + } + } + + @Override + protected void doStart() throws Exception { + if (template == null) { + template = template(); + } + ServiceHelper.startService(template); + } + + @Override + protected void doStop() throws Exception { + ServiceHelper.stopService(template); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/687adda1/camel-core/src/main/java/org/apache/camel/builder/FluentProducerTemplate.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/builder/FluentProducerTemplate.java b/camel-core/src/main/java/org/apache/camel/builder/FluentProducerTemplate.java deleted file mode 100644 index e582d9f..0000000 --- a/camel-core/src/main/java/org/apache/camel/builder/FluentProducerTemplate.java +++ /dev/null @@ -1,361 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.builder; - -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.Future; -import java.util.function.Consumer; -import java.util.function.Supplier; - -import org.apache.camel.CamelContext; -import org.apache.camel.CamelExecutionException; -import org.apache.camel.Endpoint; -import org.apache.camel.Exchange; -import org.apache.camel.ExchangePattern; -import org.apache.camel.Message; -import org.apache.camel.Processor; -import org.apache.camel.ProducerTemplate; -import org.apache.camel.processor.ConvertBodyProcessor; -import org.apache.camel.util.ExchangeHelper; -import org.apache.camel.util.ObjectHelper; - -public class FluentProducerTemplate { - private final CamelContext context; - private final ClassValue<ConvertBodyProcessor> resultProcessors; - private Map<String, Object> headers; - private Object body; - private Endpoint endpoint; - private Consumer<ProducerTemplate> templateCustomizer; - private Supplier<Exchange> exchangeSupplier; - private Supplier<Processor> processorSupplier; - private ProducerTemplate template; - - public FluentProducerTemplate(CamelContext context) { - this.context = context; - this.headers = null; - this.body = null; - this.endpoint = null; - this.templateCustomizer = null; - this.exchangeSupplier = null; - this.processorSupplier = () -> this::populateExchange; - this.template = null; - this.resultProcessors = new ClassValue<ConvertBodyProcessor>() { - @Override - protected ConvertBodyProcessor computeValue(Class<?> type) { - return new ConvertBodyProcessor(type); - } - }; - } - - /** - * Set the header - * - * @param key the key of the header - * @param value the value of the header - */ - public FluentProducerTemplate withHeader(String key, Object value) { - if (headers == null) { - headers = new HashMap<>(); - } - - headers.put(key, value); - - return this; - } - - /** - * Remove the headers. - */ - public FluentProducerTemplate clearHeaders() { - if (headers != null) { - headers.clear(); - } - - return this; - } - - /** - * Set the message body - * - * @param body the body - */ - public FluentProducerTemplate withBody(Object body) { - this.body = body; - - return this; - } - - /** - * Set the message body after converting it to the given type - * - * @param body the body - * @param type the type which the body should be converted to - */ - public FluentProducerTemplate withBodyAs(Object body, Class<?> type) { - this.body = type != null - ? context.getTypeConverter().convertTo(type, body) - : body; - - return this; - } - - /** - * Remove the body. - */ - public FluentProducerTemplate clearBody() { - this.body = null; - - return this; - } - - /** - * To customize the producer template for advanced usage like to set the - * executor service to use. - * - * <pre> - * {@code - * FluentProducerTemplate.on(context) - * .withTemplateCustomizer( - * template -> { - * template.setExecutorService(myExecutor); - * template.setMaximumCacheSize(10); - * } - * ) - * .withBody("the body") - * .to("direct:start") - * .request() - * </pre> - * - * Note that it is invoked only once. - * - * @param templateCustomizer the customizer - */ - public FluentProducerTemplate withTemplateCustomizer(final Consumer<ProducerTemplate> templateCustomizer) { - this.templateCustomizer = templateCustomizer; - return this; - } - - /** - * Set the exchange to use for send. - * - * @param exchange the exchange - */ - public FluentProducerTemplate withExchange(final Exchange exchange) { - return withExchange(() -> exchange); - } - - /** - * Set the exchangeSupplier which will be invoke to get the exchange to be - * used for send. - * - * @param exchangeSupplier the supplier - */ - public FluentProducerTemplate withExchange(final Supplier<Exchange> exchangeSupplier) { - this.exchangeSupplier = exchangeSupplier; - return this; - } - - /** - * Set the processor to use for send/request. - * - * <pre> - * {@code - * FluentProducerTemplate.on(context) - * .withProcessor( - * exchange -> { - * exchange.getIn().setHeader("Key1", "Val1") - * exchange.getIn().setHeader("Key2", "Val2") - * exchange.getIn().setBody("the body") - * } - * ) - * .to("direct:start") - * .request() - * </pre> - * - * @param processor - * @return - */ - public FluentProducerTemplate withProcessor(final Processor processor) { - return withProcessor(() -> processor); - } - - /** - * Set the processorSupplier which will be invoke to get the processor to be - * used for send/request. - * - * @param processorSupplier the supplier - */ - public FluentProducerTemplate withProcessor(final Supplier<Processor> processorSupplier) { - this.processorSupplier = processorSupplier; - return this; - } - - /** - * Set the message body - * - * @param endpointUri the endpoint URI to send to - */ - public FluentProducerTemplate to(String endpointUri) { - return to(context.getEndpoint(endpointUri)); - } - - /** - * Set the message body - * - * @param endpoint the endpoint to send to - */ - public FluentProducerTemplate to(Endpoint endpoint) { - this.endpoint = endpoint; - return this; - } - - // ************************ - // REQUEST - // ************************ - - /** - * Send to an endpoint returning any result output body. - * - * @return the result - * @throws CamelExecutionException is thrown if error occurred - */ - public Object request() throws CamelExecutionException { - return request(Object.class); - } - - /** - * Send to an endpoint. - * - * @param type the expected response type - * @return the result - * @throws CamelExecutionException is thrown if error occurred - */ - @SuppressWarnings("unchecked") - public <T> T request(Class<T> type) throws CamelExecutionException { - T result; - if (type == Exchange.class) { - result = (T)template().request(endpoint, processorSupplier.get()); - } else if (type == Message.class) { - Exchange exchange = template().request(endpoint, processorSupplier.get()); - result = exchange.hasOut() ? (T)exchange.getOut() : (T)exchange.getIn(); - } else { - Exchange exchange = template().send( - endpoint, - ExchangePattern.InOut, - processorSupplier.get(), - resultProcessors.get(type) - ); - - result = context.getTypeConverter().convertTo( - type, - ExchangeHelper.extractResultBody(exchange, exchange.getPattern()) - ); - } - - return result; - } - - /** - * Sends asynchronously to the given endpoint. - * - * @return a handle to be used to get the response in the future - */ - public Future<Object> asyncRequest() { - return asyncRequest(Object.class); - } - - /** - * Sends asynchronously to the given endpoint. - * - * @param type the expected response type - * @return a handle to be used to get the response in the future - */ - public <T> Future<T> asyncRequest(Class<T> type) { - Future<T> result; - if (headers != null) { - result = template().asyncRequestBodyAndHeaders(endpoint, body, headers, type); - } else { - result = template().asyncRequestBody(endpoint, body, type); - } - - return result; - } - - // ************************ - // SEND - // ************************ - - /** - * Send to an endpoint - * - * @throws CamelExecutionException is thrown if error occurred - */ - public Exchange send() throws CamelExecutionException { - return exchangeSupplier != null - ? template().send(endpoint, exchangeSupplier.get()) - : template().send(endpoint, processorSupplier.get()); - } - - /** - * Sends asynchronously to the given endpoint. - * - * @return a handle to be used to get the response in the future - */ - public Future<Exchange> asyncSend() { - return exchangeSupplier != null - ? template().asyncSend(endpoint, exchangeSupplier.get()) - : template().asyncSend(endpoint, processorSupplier.get()); - } - - // ************************ - // HELPERS - // ************************ - - /** - * Create the FluentProducerTemplate by setting the camel context - * - * @param context the camel context - */ - public static FluentProducerTemplate on(CamelContext context) { - return new FluentProducerTemplate(context); - } - - private ProducerTemplate template() { - ObjectHelper.notNull(context, "camel-context"); - ObjectHelper.notNull(endpoint, "endpoint"); - - if (this.template == null) { - template = context.createProducerTemplate(); - if (templateCustomizer != null) { - templateCustomizer.accept(template); - } - } - - return template; - } - - private void populateExchange(Exchange exchange) throws Exception { - if (headers != null && !headers.isEmpty()) { - exchange.getIn().getHeaders().putAll(headers); - } - if (body != null) { - exchange.getIn().setBody(body); - } - } - -} http://git-wip-us.apache.org/repos/asf/camel/blob/687adda1/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java index c325e89..3d0f13b 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java +++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java @@ -56,6 +56,7 @@ import org.apache.camel.ConsumerTemplate; import org.apache.camel.Endpoint; import org.apache.camel.ErrorHandlerFactory; import org.apache.camel.FailedToStartRouteException; +import org.apache.camel.FluentProducerTemplate; import org.apache.camel.IsSingleton; import org.apache.camel.MultipleConsumersSupport; import org.apache.camel.NamedNode; @@ -82,6 +83,7 @@ import org.apache.camel.VetoCamelContextStartException; import org.apache.camel.api.management.mbean.ManagedCamelContextMBean; import org.apache.camel.api.management.mbean.ManagedProcessorMBean; import org.apache.camel.api.management.mbean.ManagedRouteMBean; +import org.apache.camel.builder.DefaultFluentProducerTemplate; import org.apache.camel.builder.ErrorHandlerBuilder; import org.apache.camel.builder.ErrorHandlerBuilderSupport; import org.apache.camel.component.properties.PropertiesComponent; @@ -2705,6 +2707,23 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon return answer; } + public FluentProducerTemplate createFluentProducerTemplate() { + int size = CamelContextHelper.getMaximumCachePoolSize(this); + return createFluentProducerTemplate(size); + } + + public FluentProducerTemplate createFluentProducerTemplate(int maximumCacheSize) { + DefaultFluentProducerTemplate answer = new DefaultFluentProducerTemplate(this); + answer.setMaximumCacheSize(maximumCacheSize); + // start it so its ready to use + try { + startService(answer); + } catch (Exception e) { + throw ObjectHelper.wrapRuntimeCamelException(e); + } + return answer; + } + public ConsumerTemplate createConsumerTemplate() { int size = CamelContextHelper.getMaximumCachePoolSize(this); return createConsumerTemplate(size); http://git-wip-us.apache.org/repos/asf/camel/blob/687adda1/camel-core/src/test/java/org/apache/camel/builder/FluentProducerTemplateTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/builder/FluentProducerTemplateTest.java b/camel-core/src/test/java/org/apache/camel/builder/FluentProducerTemplateTest.java index 29d8dc0..412e08d 100644 --- a/camel-core/src/test/java/org/apache/camel/builder/FluentProducerTemplateTest.java +++ b/camel-core/src/test/java/org/apache/camel/builder/FluentProducerTemplateTest.java @@ -20,19 +20,38 @@ import org.apache.camel.CamelExecutionException; import org.apache.camel.ContextTestSupport; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; +import org.apache.camel.FluentProducerTemplate; import org.apache.camel.RuntimeCamelException; import org.apache.camel.component.mock.MockEndpoint; /** - * Unit test for DefaultProducerTemplate + * Unit test for FluentProducerTemplate */ public class FluentProducerTemplateTest extends ContextTestSupport { + public void testFromCamelContext() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedBodiesReceived("Bye World"); + + FluentProducerTemplate fluent = context.createFluentProducerTemplate(); + + Object result = fluent + .withBody("Hello World") + .to("direct:in") + .request(); + + assertMockEndpointsSatisfied(); + + assertEquals("Bye World", result); + + assertSame(context, fluent.getCamelContext()); + } + public void testIn() throws Exception { MockEndpoint mock = getMockEndpoint("mock:result"); mock.expectedBodiesReceived("Bye World"); - Object result = FluentProducerTemplate.on(context) + Object result = DefaultFluentProducerTemplate.on(context) .withBody("Hello World") .to("direct:in") .request(); @@ -48,7 +67,7 @@ public class FluentProducerTemplateTest extends ContextTestSupport { MockEndpoint mock = getMockEndpoint("mock:result"); mock.expectedBodiesReceived("Bye Bye World"); - Object result = FluentProducerTemplate.on(context) + Object result = DefaultFluentProducerTemplate.on(context) .withBody("Hello World") .to("direct:out") .request(); @@ -62,7 +81,7 @@ public class FluentProducerTemplateTest extends ContextTestSupport { MockEndpoint mock = getMockEndpoint("mock:result"); mock.expectedBodiesReceived(11); - Object result = FluentProducerTemplate.on(context) + Object result = DefaultFluentProducerTemplate.on(context) .withBodyAs("10", Integer.class) .to("direct:sum") .request(); @@ -77,7 +96,7 @@ public class FluentProducerTemplateTest extends ContextTestSupport { mock.expectedMessageCount(0); try { - FluentProducerTemplate.on(context) + DefaultFluentProducerTemplate.on(context) .withBodyAs("10", Double.class) .to("direct:sum") .request(); @@ -93,7 +112,7 @@ public class FluentProducerTemplateTest extends ContextTestSupport { MockEndpoint mock = getMockEndpoint("mock:result"); mock.expectedMessageCount(0); - Object result = FluentProducerTemplate.on(context) + Object result = DefaultFluentProducerTemplate.on(context) .withBody("Hello World") .to("direct:fault") .request(); @@ -107,7 +126,7 @@ public class FluentProducerTemplateTest extends ContextTestSupport { MockEndpoint mock = getMockEndpoint("mock:result"); mock.expectedMessageCount(0); - Exchange out = FluentProducerTemplate.on(context) + Exchange out = DefaultFluentProducerTemplate.on(context) .withBody("Hello World") .to("direct:exception") .send(); @@ -123,7 +142,7 @@ public class FluentProducerTemplateTest extends ContextTestSupport { MockEndpoint mock = getMockEndpoint("mock:result"); mock.expectedMessageCount(0); - Exchange out = FluentProducerTemplate.on(context) + Exchange out = DefaultFluentProducerTemplate.on(context) .withProcessor(exchange -> exchange.getIn().setBody("Hello World")) .to("direct:exception") .send(); @@ -138,7 +157,7 @@ public class FluentProducerTemplateTest extends ContextTestSupport { MockEndpoint mock = getMockEndpoint("mock:result"); mock.expectedMessageCount(0); - Exchange out = FluentProducerTemplate.on(context) + Exchange out = DefaultFluentProducerTemplate.on(context) .withExchange(() -> { Exchange exchange = context.getEndpoint("direct:exception").createExchange(); exchange.getIn().setBody("Hello World"); @@ -158,7 +177,7 @@ public class FluentProducerTemplateTest extends ContextTestSupport { mock.expectedMessageCount(0); try { - FluentProducerTemplate.on(context) + DefaultFluentProducerTemplate.on(context) .withBody("Hello World") .to("direct:exception") .request(); @@ -176,7 +195,7 @@ public class FluentProducerTemplateTest extends ContextTestSupport { MockEndpoint mock = getMockEndpoint("mock:result"); mock.expectedMessageCount(0); - Exchange out = FluentProducerTemplate.on(context) + Exchange out = DefaultFluentProducerTemplate.on(context) .withProcessor(exchange -> exchange.getIn().setBody("Hello World")) .to("direct:exception") .request(Exchange.class); @@ -191,7 +210,7 @@ public class FluentProducerTemplateTest extends ContextTestSupport { MockEndpoint mock = getMockEndpoint("mock:result"); mock.expectedMessageCount(0); - Exchange out = FluentProducerTemplate.on(context) + Exchange out = DefaultFluentProducerTemplate.on(context) .withExchange(() -> { Exchange exchange = context.getEndpoint("direct:exception").createExchange(ExchangePattern.InOut); exchange.getIn().setBody("Hello World"); @@ -208,7 +227,7 @@ public class FluentProducerTemplateTest extends ContextTestSupport { public void testRequestBody() throws Exception { // with endpoint as string uri - FluentProducerTemplate template = FluentProducerTemplate.on(context); + FluentProducerTemplate template = DefaultFluentProducerTemplate.on(context); final Integer expectedResult = new Integer(123); http://git-wip-us.apache.org/repos/asf/camel/blob/687adda1/components/camel-jsonpath/src/test/java/org/apache/camel/jsonpath/JsonPathWithSimpleCBRTest.java ---------------------------------------------------------------------- diff --git a/components/camel-jsonpath/src/test/java/org/apache/camel/jsonpath/JsonPathWithSimpleCBRTest.java b/components/camel-jsonpath/src/test/java/org/apache/camel/jsonpath/JsonPathWithSimpleCBRTest.java index eb9bbf9..f1b1029 100644 --- a/components/camel-jsonpath/src/test/java/org/apache/camel/jsonpath/JsonPathWithSimpleCBRTest.java +++ b/components/camel-jsonpath/src/test/java/org/apache/camel/jsonpath/JsonPathWithSimpleCBRTest.java @@ -18,7 +18,6 @@ package org.apache.camel.jsonpath; import java.io.File; -import org.apache.camel.builder.FluentProducerTemplate; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.test.junit4.CamelTestSupport; import org.junit.Test; @@ -48,8 +47,7 @@ public class JsonPathWithSimpleCBRTest extends CamelTestSupport { getMockEndpoint("mock:average").expectedMessageCount(0); getMockEndpoint("mock:expensive").expectedMessageCount(0); - FluentProducerTemplate fluent = new FluentProducerTemplate(context); - fluent.withHeader("cheap", 10).withHeader("average", 30).withBody(new File("src/test/resources/cheap.json")) + fluentTemplate.withHeader("cheap", 10).withHeader("average", 30).withBody(new File("src/test/resources/cheap.json")) .to("direct:start").send(); assertMockEndpointsSatisfied(); @@ -61,8 +59,7 @@ public class JsonPathWithSimpleCBRTest extends CamelTestSupport { getMockEndpoint("mock:average").expectedMessageCount(1); getMockEndpoint("mock:expensive").expectedMessageCount(0); - FluentProducerTemplate fluent = new FluentProducerTemplate(context); - fluent.withHeader("cheap", 10).withHeader("average", 30).withBody(new File("src/test/resources/average.json")) + fluentTemplate.withHeader("cheap", 10).withHeader("average", 30).withBody(new File("src/test/resources/average.json")) .to("direct:start").send(); assertMockEndpointsSatisfied(); @@ -74,8 +71,7 @@ public class JsonPathWithSimpleCBRTest extends CamelTestSupport { getMockEndpoint("mock:average").expectedMessageCount(0); getMockEndpoint("mock:expensive").expectedMessageCount(1); - FluentProducerTemplate fluent = new FluentProducerTemplate(context); - fluent.withHeader("cheap", 10).withHeader("average", 30).withBody(new File("src/test/resources/expensive.json")) + fluentTemplate.withHeader("cheap", 10).withHeader("average", 30).withBody(new File("src/test/resources/expensive.json")) .to("direct:start").send(); assertMockEndpointsSatisfied(); http://git-wip-us.apache.org/repos/asf/camel/blob/687adda1/components/camel-test/src/main/java/org/apache/camel/test/junit4/CamelTestSupport.java ---------------------------------------------------------------------- diff --git a/components/camel-test/src/main/java/org/apache/camel/test/junit4/CamelTestSupport.java b/components/camel-test/src/main/java/org/apache/camel/test/junit4/CamelTestSupport.java index 03fd1fe..34a18ed 100644 --- a/components/camel-test/src/main/java/org/apache/camel/test/junit4/CamelTestSupport.java +++ b/components/camel-test/src/main/java/org/apache/camel/test/junit4/CamelTestSupport.java @@ -44,7 +44,7 @@ import org.apache.camel.Service; import org.apache.camel.ServiceStatus; import org.apache.camel.api.management.mbean.ManagedCamelContextMBean; import org.apache.camel.builder.AdviceWithRouteBuilder; -import org.apache.camel.builder.FluentProducerTemplate; +import org.apache.camel.FluentProducerTemplate; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.component.properties.PropertiesComponent; @@ -79,6 +79,7 @@ public abstract class CamelTestSupport extends TestSupport { private static final ThreadLocal<Boolean> INIT = new ThreadLocal<Boolean>(); private static ThreadLocal<ModelCamelContext> threadCamelContext = new ThreadLocal<ModelCamelContext>(); private static ThreadLocal<ProducerTemplate> threadTemplate = new ThreadLocal<ProducerTemplate>(); + private static ThreadLocal<FluentProducerTemplate> threadFluentTemplate = new ThreadLocal<FluentProducerTemplate>(); private static ThreadLocal<ConsumerTemplate> threadConsumer = new ThreadLocal<ConsumerTemplate>(); private static ThreadLocal<Service> threadService = new ThreadLocal<Service>(); protected volatile ModelCamelContext context; @@ -296,11 +297,13 @@ public abstract class CamelTestSupport extends TestSupport { template = context.createProducerTemplate(); template.start(); + fluentTemplate = context.createFluentProducerTemplate(); + fluentTemplate.start(); consumer = context.createConsumerTemplate(); consumer.start(); - fluentTemplate = FluentProducerTemplate.on(context()); threadTemplate.set(template); + threadFluentTemplate.set(fluentTemplate); threadConsumer.set(consumer); // enable auto mocking if enabled @@ -404,7 +407,7 @@ public abstract class CamelTestSupport extends TestSupport { } LOG.debug("tearDown test"); - doStopTemplates(consumer, template); + doStopTemplates(consumer, template, fluentTemplate); doStopCamelContext(context, camelContextService); } @@ -412,7 +415,7 @@ public abstract class CamelTestSupport extends TestSupport { public static void tearDownAfterClass() throws Exception { INIT.remove(); LOG.debug("tearDownAfterClass test"); - doStopTemplates(threadConsumer.get(), threadTemplate.get()); + doStopTemplates(threadConsumer.get(), threadTemplate.get(), threadFluentTemplate.get()); doStopCamelContext(threadCamelContext.get(), threadService.get()); } @@ -489,6 +492,7 @@ public abstract class CamelTestSupport extends TestSupport { protected void postProcessTest() throws Exception { context = threadCamelContext.get(); template = threadTemplate.get(); + fluentTemplate = threadFluentTemplate.get(); consumer = threadConsumer.get(); camelContextService = threadService.get(); applyCamelPostProcessor(); @@ -527,7 +531,7 @@ public abstract class CamelTestSupport extends TestSupport { } } - private static void doStopTemplates(ConsumerTemplate consumer, ProducerTemplate template) throws Exception { + private static void doStopTemplates(ConsumerTemplate consumer, ProducerTemplate template, FluentProducerTemplate fluentTemplate) throws Exception { if (consumer != null) { if (consumer == threadConsumer.get()) { threadConsumer.remove(); @@ -540,6 +544,12 @@ public abstract class CamelTestSupport extends TestSupport { } template.stop(); } + if (fluentTemplate != null) { + if (fluentTemplate == threadFluentTemplate.get()) { + threadFluentTemplate.remove(); + } + fluentTemplate.stop(); + } } protected void startCamelContext() throws Exception { http://git-wip-us.apache.org/repos/asf/camel/blob/687adda1/components/camel-test/src/test/java/org/apache/camel/test/patterns/FilterFluentTemplateTest.java ---------------------------------------------------------------------- diff --git a/components/camel-test/src/test/java/org/apache/camel/test/patterns/FilterFluentTemplateTest.java b/components/camel-test/src/test/java/org/apache/camel/test/patterns/FilterFluentTemplateTest.java new file mode 100644 index 0000000..26ee527 --- /dev/null +++ b/components/camel-test/src/test/java/org/apache/camel/test/patterns/FilterFluentTemplateTest.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.test.patterns; + +import org.apache.camel.EndpointInject; +import org.apache.camel.Produce; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +/** + * Tests filtering using Camel Test + * + * @version + */ +// START SNIPPET: example +// tag::example[] +public class FilterFluentTemplateTest extends CamelTestSupport { + + @EndpointInject(uri = "mock:result") + protected MockEndpoint resultEndpoint; + + @Override + public boolean isDumpRouteCoverage() { + return true; + } + + @Test + public void testSendMatchingMessage() throws Exception { + String expectedBody = "<matched/>"; + + resultEndpoint.expectedBodiesReceived(expectedBody); + + fluentTemplate.withBody(expectedBody).withHeader("foo", "bar").to("direct:start").send(); + + resultEndpoint.assertIsSatisfied(); + } + + @Test + public void testSendNotMatchingMessage() throws Exception { + resultEndpoint.expectedMessageCount(0); + + fluentTemplate.withBody("<notMatched/>").withHeader("foo", "notMatchedHeaderValue").to("direct:start").send();; + + resultEndpoint.assertIsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + public void configure() { + from("direct:start").filter(header("foo").isEqualTo("bar")).to("mock:result"); + } + }; + } +} +// end::example[] +// END SNIPPET: example