Repository: camel Updated Branches: refs/heads/master c66552d99 -> 473935b53
CAMEL-9602 - ProducerTemplateBuilder Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/473935b5 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/473935b5 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/473935b5 Branch: refs/heads/master Commit: 473935b535c972b03562d2e4656bb294b5ce0a14 Parents: c66552d Author: lburgazzoli <lburgazz...@gmail.com> Authored: Tue Mar 1 14:00:59 2016 +0100 Committer: lburgazzoli <lburgazz...@gmail.com> Committed: Wed Apr 27 10:14:58 2016 +0200 ---------------------------------------------------------------------- .../camel/builder/FluentProducerTemplate.java | 351 +++++++++++++++++++ .../builder/FluentProducerTemplateTest.java | 291 +++++++++++++++ 2 files changed, 642 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/473935b5/camel-core/src/main/java/org/apache/camel/builder/FluentProducerTemplate.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/builder/FluentProducerTemplate.java b/camel-core/src/main/java/org/apache/camel/builder/FluentProducerTemplate.java new file mode 100644 index 0000000..bb8ae98 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/builder/FluentProducerTemplate.java @@ -0,0 +1,351 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.builder; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Future; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import org.apache.camel.CamelContext; +import org.apache.camel.CamelExecutionException; +import org.apache.camel.Endpoint; +import org.apache.camel.Exchange; +import org.apache.camel.ExchangePattern; +import org.apache.camel.Message; +import org.apache.camel.Processor; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.util.ExchangeHelper; +import org.apache.camel.util.ObjectHelper; + +public class FluentProducerTemplate { + private final CamelContext context; + private Map<String, Object> headers; + private Object body; + private Endpoint endpoint; + private Consumer<ProducerTemplate> templateCustomizer; + private Supplier<Exchange> exchangeSupplier; + private Supplier<Processor> processorSupplier; + private ProducerTemplate template; + + public FluentProducerTemplate(CamelContext context) { + this.context = context; + this.headers = null; + this.body = null; + this.endpoint = null; + this.templateCustomizer = null; + this.exchangeSupplier = null; + this.processorSupplier = () -> this::populateExchange; + this.template = null; + } + + /** + * Set the header + * + * @param key the key of the header + * @param value the value of the header + * @return ProducerTemplate builder + */ + public FluentProducerTemplate withHeader(String key, Object value) { + if (headers == null) { + headers = new HashMap<>(); + } + + headers.put(key, value); + + return this; + } + + /** + * Remove the headers. + * + * @return ProducerTemplate builder + */ + public FluentProducerTemplate clearHeaders() { + if (headers != null) { + headers.clear(); + } + + return this; + } + + /** + * Set the message body + * + * @param body the body + * @return ProducerTemplate builder + */ + public FluentProducerTemplate withBody(Object body) { + this.body = body; + + return this; + } + + /** + * Remove the body. + * + * @return ProducerTemplate builder + */ + public FluentProducerTemplate clearBody() { + this.body = null; + + return this; + } + + /** + * To customize the producer template for advanced usage like to set the + * executor service to use. + * + * <pre> + * {@code + * FluentProducerTemplate.on(context) + * .withTemplateCustomizer( + * template -> { + * template.setExecutorService(myExecutor); + * template.setMaximumCacheSize(10); + * } + * ) + * .withBody("the body") + * .to("direct:start") + * .request() + * </pre> + * + * Note that it is invoked only once. + * + * @param templateCustomizer + * @return + */ + public FluentProducerTemplate withTemplateCustomizer(final Consumer<ProducerTemplate> templateCustomizer) { + this.templateCustomizer = templateCustomizer; + return this; + } + + /** + * Set the exchange to use for send. + * + * @param exchange + * @return + */ + public FluentProducerTemplate withExchange(final Exchange exchange) { + return withExchange(() -> exchange); + } + + /** + * Set the exchangeSupplier which will be invoke to get the exchange to be + * used for send. + * + * @param exchangeSupplier + * @return + */ + public FluentProducerTemplate withExchange(final Supplier<Exchange> exchangeSupplier) { + this.exchangeSupplier = exchangeSupplier; + return this; + } + + /** + * Set the processor to use for send/request. + * + * <pre> + * {@code + * FluentProducerTemplate.on(context) + * .withProcessor( + * exchange -> { + * exchange.getIn().setHeader("Key1", "Val1") + * exchange.getIn().setHeader("Key2", "Val2") + * exchange.getIn().setBody("the body") + * } + * ) + * .to("direct:start") + * .request() + * </pre> + * + * @param processor + * @return + */ + public FluentProducerTemplate withProcessor(final Processor processor) { + return withProcessor(() -> processor); + } + + /** + * Set the processorSupplier which will be invoke to get the processor to be + * used for send/request. + * + * @param processorSupplier + * @return + */ + public FluentProducerTemplate withProcessor(final Supplier<Processor> processorSupplier) { + this.processorSupplier = processorSupplier; + return this; + } + + /** + * Set the message body + * + * @param endpointUri the endpoint URI to send to + * @return ProducerTemplate builder + */ + public FluentProducerTemplate to(String endpointUri) { + return to(context.getEndpoint(endpointUri)); + } + + /** + * Set the message body + * + * @param endpoint the endpoint to send to + * @return ProducerTemplate builder + */ + public FluentProducerTemplate to(Endpoint endpoint) { + this.endpoint = endpoint; + return this; + } + + // ************************ + // REQUEST + // ************************ + + /** + * Send to an endpoint returning any result output body. + * + * @return the result + * @throws CamelExecutionException + */ + public Object request() throws CamelExecutionException { + return request(Object.class); + } + + /** + * Send to an endpoint. + * + * @param type the expected response type + * @return the result + * @throws CamelExecutionException + */ + @SuppressWarnings("unchecked") + public <T> T request(Class<T> type) throws CamelExecutionException { + T result; + if (type == Exchange.class) { + result = (T)template().request(endpoint, processorSupplier.get()); + } else if (type == Message.class) { + Exchange exchange = template().request(endpoint, processorSupplier.get()); + result = exchange.hasOut() ? (T)exchange.getOut() : (T)exchange.getIn(); + } else { + Exchange exchange = template().send(endpoint, ExchangePattern.InOut, processorSupplier.get()); + result = context.getTypeConverter().convertTo( + type, + ExchangeHelper.extractResultBody(exchange, exchange.getPattern()) + ); + } + + return result; + } + + /** + * Sends asynchronously to the given endpoint. + * + * @return a handle to be used to get the response in the future + */ + public Future<Object> asyncRequest() { + return asyncRequest(Object.class); + } + + /** + * Sends asynchronously to the given endpoint. + * + * @param type the expected response type + * @return a handle to be used to get the response in the future + */ + public <T> Future<T> asyncRequest(Class<T> type) { + Future<T> result; + if (headers != null) { + result = template().asyncRequestBodyAndHeaders(endpoint, body, headers, type); + } else { + result = template().asyncRequestBody(endpoint, body, type); + } + + return result; + } + + // ************************ + // SEND + // ************************ + + /** + * Send to an endpoint + * + * @throws CamelExecutionException + */ + public Exchange send() throws CamelExecutionException { + Exchange result = exchangeSupplier != null + ? template().send(endpoint, exchangeSupplier.get()) + : template().send(endpoint, processorSupplier.get()); + + // TODO: validate + // must invoke extract result body in case of exception to be rethrown + //ExchangeHelper.extractResultBody(result, null); + + return result; + } + + /** + * Sends asynchronously to the given endpoint. + * + * @return a handle to be used to get the response in the future + */ + public Future<Exchange> asyncSend() { + return exchangeSupplier != null + ? template().asyncSend(endpoint, exchangeSupplier.get()) + : template().asyncSend(endpoint, processorSupplier.get()); + } + + // ************************ + // HELPERS + // ************************ + + /** + * Create the FluentProducerTemplate by setting the camel context + * + * @param context the camel context + * @return ProducerTemplate builder + */ + public static FluentProducerTemplate on(CamelContext context) { + return new FluentProducerTemplate(context); + } + + private ProducerTemplate template() { + ObjectHelper.notNull(context, "camel-context"); + ObjectHelper.notNull(endpoint, "endpoint"); + + if (this.template == null) { + template = context.createProducerTemplate(); + if (templateCustomizer != null) { + templateCustomizer.accept(template); + } + } + + return template; + } + + private void populateExchange(Exchange exchange) throws Exception { + if (headers != null && !headers.isEmpty()) { + exchange.getIn().getHeaders().putAll(headers); + } + if (body != null) { + exchange.getIn().setBody(body); + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/473935b5/camel-core/src/test/java/org/apache/camel/builder/FluentProducerTemplateTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/builder/FluentProducerTemplateTest.java b/camel-core/src/test/java/org/apache/camel/builder/FluentProducerTemplateTest.java new file mode 100644 index 0000000..27504f3 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/builder/FluentProducerTemplateTest.java @@ -0,0 +1,291 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.builder; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.ExchangePattern; +import org.apache.camel.RuntimeCamelException; +import org.apache.camel.component.mock.MockEndpoint; + +/** + * Unit test for DefaultProducerTemplate + */ +public class FluentProducerTemplateTest extends ContextTestSupport { + + public void testIn() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedBodiesReceived("Bye World"); + + Object result = FluentProducerTemplate.on(context) + .withBody("Hello World") + .to("direct:in") + .request(); + + assertMockEndpointsSatisfied(); + + assertEquals("Bye World", result); + + assertSame(context, template.getCamelContext()); + } + + public void testInOut() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedBodiesReceived("Bye Bye World"); + + Object result = FluentProducerTemplate.on(context) + .withBody("Hello World") + .to("direct:out") + .request(); + + assertMockEndpointsSatisfied(); + + assertEquals("Bye Bye World", result); + } + + public void testFault() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(0); + + Object result = FluentProducerTemplate.on(context) + .withBody("Hello World") + .to("direct:fault") + .request(); + + assertMockEndpointsSatisfied(); + + assertEquals("Faulty World", result); + } + + // TODO: to review + public void testExceptionUsingBody() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(0); + + Exchange out = FluentProducerTemplate.on(context) + .withBody("Hello World") + .to("direct:exception") + .send(); + + assertTrue(out.isFailed()); + assertTrue(out.getException() instanceof IllegalArgumentException); + assertEquals("Forced exception by unit test", out.getException().getMessage()); + + /* + try { + Exchange out = FluentProducerTemplate.on(context) + .withBody("Hello World") + .to("direct:exception") + .send(); + + assertTrue(out.isFailed()); + fail("Should have thrown RuntimeCamelException"); + } catch (RuntimeCamelException e) { + assertTrue(e.getCause() instanceof IllegalArgumentException); + assertEquals("Forced exception by unit test", e.getCause().getMessage()); + } + */ + + assertMockEndpointsSatisfied(); + } + + // TODO: to review + public void testExceptionUsingProcessor() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(0); + + Exchange out = FluentProducerTemplate.on(context) + .withProcessor(exchange -> exchange.getIn().setBody("Hello World")) + .to("direct:exception") + .send(); + + assertTrue(out.isFailed()); + assertEquals("Forced exception by unit test", out.getException().getMessage()); + + assertMockEndpointsSatisfied(); + } + + public void testExceptionUsingExchange() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(0); + + Exchange out = FluentProducerTemplate.on(context) + .withExchange(() -> { + Exchange exchange = context.getEndpoint("direct:exception").createExchange(); + exchange.getIn().setBody("Hello World"); + + return exchange; + }) + .to("direct:exception") + .send(); + + assertTrue(out.isFailed()); + assertEquals("Forced exception by unit test", out.getException().getMessage()); + + assertMockEndpointsSatisfied(); + } + + public void testRequestExceptionUsingBody() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(0); + + try { + FluentProducerTemplate.on(context) + .withBody("Hello World") + .to("direct:exception") + .request(); + + fail("Should have thrown RuntimeCamelException"); + } catch (RuntimeCamelException e) { + assertTrue(e.getCause() instanceof IllegalArgumentException); + assertEquals("Forced exception by unit test", e.getCause().getMessage()); + } + + assertMockEndpointsSatisfied(); + } + + public void testRequestExceptionUsingProcessor() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(0); + + Exchange out = FluentProducerTemplate.on(context) + .withProcessor(exchange -> exchange.getIn().setBody("Hello World")) + .to("direct:exception") + .request(Exchange.class); + + assertTrue(out.isFailed()); + assertEquals("Forced exception by unit test", out.getException().getMessage()); + + assertMockEndpointsSatisfied(); + } + + public void testRequestExceptionUsingExchange() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(0); + + Exchange out = FluentProducerTemplate.on(context) + .withExchange(() -> { + Exchange exchange = context.getEndpoint("direct:exception").createExchange(ExchangePattern.InOut); + exchange.getIn().setBody("Hello World"); + + return exchange; + }) + .to("direct:exception") + .send(); + + assertTrue(out.isFailed()); + assertEquals("Forced exception by unit test", out.getException().getMessage()); + + assertMockEndpointsSatisfied(); + } + + public void testRequestBody() throws Exception { + // with endpoint as string uri + FluentProducerTemplate template = FluentProducerTemplate.on(context); + + final Integer expectedResult = new Integer(123); + + assertEquals( + expectedResult, + template.clearBody() + .clearHeaders() + .withBody("Hello") + .to("direct:inout") + .request(Integer.class) + ); + + assertEquals( + expectedResult, + template.clearBody() + .clearHeaders() + .withHeader("foo", "bar") + .withBody("Hello") + .to("direct:inout") + .request(Integer.class) + ); + + assertEquals( + expectedResult, + template.clearBody() + .clearHeaders() + .withBody("Hello") + .to("direct:inout") + .request(Integer.class) + ); + + assertEquals( + expectedResult, + template.clearBody() + .clearHeaders() + .withBody("Hello") + .to(context.getEndpoint("direct:inout")) + .request(Integer.class) + ); + + assertEquals( + expectedResult, + template.clearBody() + .clearHeaders() + .withHeader("foo", "bar") + .withBody("Hello") + .to(context.getEndpoint("direct:inout")) + .request(Integer.class) + ); + + assertEquals( + expectedResult, + template.clearBody() + .clearHeaders() + .withBody("Hello") + .to(context.getEndpoint("direct:inout")) + .request(Integer.class) + ); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() throws Exception { + // for faster unit test + errorHandler(noErrorHandler()); + + from("direct:in").process( + exchange -> exchange.getIn().setBody("Bye World")) + .to("mock:result"); + + from("direct:out").process( + exchange -> exchange.getOut().setBody("Bye Bye World")) + .to("mock:result"); + + from("direct:fault").process( + exchange -> { + exchange.getOut().setFault(true); + exchange.getOut().setBody("Faulty World"); + }) + .to("mock:result"); + + from("direct:exception").process( + exchange -> { + throw new IllegalArgumentException("Forced exception by unit test"); + }) + .to("mock:result"); + + from("direct:inout").transform(constant(123)); + } + }; + } +}