This is an automated email from the ASF dual-hosted git repository. gnodet pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit fed5ab34eefbf00a59ab9ea2403b0df4070e311b Author: Guillaume Nodet <gno...@gmail.com> AuthorDate: Mon Jun 17 20:59:07 2019 +0200 [CAMEL-8362] Fully integrate the endpoint dsl in the java dsl WIth support for recipientList, routingSlip, wireTap, enrich, toD --- .../camel/builder/EndpointProducerBuilder.java | 3 + .../apache/camel/model/ProcessorDefinition.java | 463 +++++++++++++++++++-- .../org/apache/camel/model/RouteDefinition.java | 6 + .../java/org/apache/camel/model/ToDefinition.java | 5 + .../apache/camel/model/ToDynamicDefinition.java | 12 + .../org/apache/camel/reifier/ToDynamicReifier.java | 18 +- .../org/apache/camel/reifier/WireTapReifier.java | 6 +- core/camel-endpointdsl/pom.xml | 14 + .../builder/endpoint/AbstractEndpointBuilder.java | 14 +- .../builder/endpoint/EndpointBuilderFactory.java | 21 + .../endpoint/ProcessorDefinitionDslTest.java | 67 +++ .../src/test/resources/log4j2.properties | 49 +++ 12 files changed, 643 insertions(+), 35 deletions(-) diff --git a/core/camel-core/src/main/java/org/apache/camel/builder/EndpointProducerBuilder.java b/core/camel-core/src/main/java/org/apache/camel/builder/EndpointProducerBuilder.java index d003692..b08034b 100644 --- a/core/camel-core/src/main/java/org/apache/camel/builder/EndpointProducerBuilder.java +++ b/core/camel-core/src/main/java/org/apache/camel/builder/EndpointProducerBuilder.java @@ -18,6 +18,7 @@ package org.apache.camel.builder; import org.apache.camel.CamelContext; import org.apache.camel.Endpoint; +import org.apache.camel.Expression; import org.apache.camel.NoSuchEndpointException; public interface EndpointProducerBuilder { @@ -28,4 +29,6 @@ public interface EndpointProducerBuilder { void setProperty(String name, Object value); + Expression expr(); + } diff --git a/core/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java b/core/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java index b6439c5..ce50486 100644 --- a/core/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java +++ b/core/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java @@ -265,6 +265,19 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> /** * Sends the exchange to the given dynamic endpoint * + * @param endpointProducerBuilder the dynamic endpoint to send to (resolved using simple language by default) + * @return the builder + */ + public Type toD(@AsEndpointUri EndpointProducerBuilder endpointProducerBuilder) { + ToDynamicDefinition answer = new ToDynamicDefinition(); + answer.setEndpointProducerBuilder(endpointProducerBuilder); + addOutput(answer); + return asType(); + } + + /** + * Sends the exchange to the given dynamic endpoint + * * @param uri the dynamic endpoint to send to (resolved using simple language by default) * @param cacheSize sets the maximum size used by the {@link org.apache.camel.spi.ConsumerCache} which is used to cache and reuse producers. * @@ -281,6 +294,22 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> /** * Sends the exchange to the given dynamic endpoint * + * @param endpointProducerBuilder the dynamic endpoint to send to (resolved using simple language by default) + * @param cacheSize sets the maximum size used by the {@link org.apache.camel.spi.ConsumerCache} which is used to cache and reuse producers. + * + * @return the builder + */ + public Type toD(@AsEndpointUri EndpointProducerBuilder endpointProducerBuilder, int cacheSize) { + ToDynamicDefinition answer = new ToDynamicDefinition(); + answer.setEndpointProducerBuilder(endpointProducerBuilder); + answer.setCacheSize(cacheSize); + addOutput(answer); + return asType(); + } + + /** + * Sends the exchange to the given dynamic endpoint + * * @param uri the dynamic endpoint to send to (resolved using simple language by default) * @param ignoreInvalidEndpoint ignore the invalidate endpoint exception when try to create a producer with that endpoint * @return the builder @@ -294,6 +323,21 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> } /** + * Sends the exchange to the given dynamic endpoint + * + * @param endpointProducerBuilder the dynamic endpoint to send to (resolved using simple language by default) + * @param ignoreInvalidEndpoint ignore the invalidate endpoint exception when try to create a producer with that endpoint + * @return the builder + */ + public Type toD(@AsEndpointUri EndpointProducerBuilder endpointProducerBuilder, boolean ignoreInvalidEndpoint) { + ToDynamicDefinition answer = new ToDynamicDefinition(); + answer.setEndpointProducerBuilder(endpointProducerBuilder); + answer.setIgnoreInvalidEndpoint(ignoreInvalidEndpoint); + addOutput(answer); + return asType(); + } + + /** * Sends the exchange to the given endpoint * * @param uri the String formatted endpoint uri to send to @@ -355,7 +399,13 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> return asType(); } - public Type to(EndpointProducerBuilder endpoint) { + /** + * Sends the exchange to the given endpoint + * + * @param endpoint the endpoint to send to + * @return the builder + */ + public Type to(@AsEndpointUri EndpointProducerBuilder endpoint) { addOutput(new ToDefinition(endpoint)); return asType(); } @@ -372,7 +422,7 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> public Type to(ExchangePattern pattern, @AsEndpointUri String uri) { addOutput(new ToDefinition(uri, pattern)); return asType(); - } + } /** * Sends the exchange with certain exchange pattern to the given endpoint @@ -389,6 +439,20 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> } /** + * Sends the exchange with certain exchange pattern to the given endpoint + * <p/> + * Notice the existing MEP is preserved + * + * @param pattern the pattern to use for the message exchange + * @param endpoint the endpoint to send to + * @return the builder + */ + public Type to(ExchangePattern pattern, EndpointProducerBuilder endpoint) { + addOutput(new ToDefinition(endpoint, pattern)); + return asType(); + } + + /** * Sends the exchange to a list of endpoints * * @param uris list of endpoints to send to @@ -429,6 +493,19 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> /** * Sends the exchange to a list of endpoints + * + * @param endpoints list of endpoints to send to + * @return the builder + */ + public Type to(@AsEndpointUri EndpointProducerBuilder... endpoints) { + for (EndpointProducerBuilder endpoint : endpoints) { + addOutput(new ToDefinition(endpoint)); + } + return asType(); + } + + /** + * Sends the exchange to a list of endpoints * <p/> * Notice the existing MEP is preserved * @@ -474,6 +551,22 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> } /** + * Sends the exchange to a list of endpoints + * <p/> + * Notice the existing MEP is preserved + * + * @param pattern the pattern to use for the message exchanges + * @param endpoints list of endpoints to send to + * @return the builder + */ + public Type to(ExchangePattern pattern, @AsEndpointUri EndpointProducerBuilder... endpoints) { + for (EndpointProducerBuilder endpoint : endpoints) { + addOutput(new ToDefinition(endpoint, pattern)); + } + return asType(); + } + + /** * <a href="http://camel.apache.org/exchange-pattern.html">ExchangePattern:</a> * set the {@link ExchangePattern} into the {@link Exchange}. * <p/> @@ -1983,6 +2076,22 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> * destination gets a copy of the original message to avoid the processors * interfering with each other using {@link ExchangePattern#InOnly}. * + * @param endpoint the endpoint to wiretap to + * @return the builder + */ + public WireTapDefinition<Type> wireTap(@AsEndpointUri EndpointProducerBuilder endpoint) { + WireTapDefinition answer = new WireTapDefinition(); + answer.setEndpointProducerBuilder(endpoint); + addOutput(answer); + return answer; + } + + /** + * <a href="http://camel.apache.org/wiretap.html">WireTap EIP:</a> + * Sends messages to all its child outputs; so that each processor and + * destination gets a copy of the original message to avoid the processors + * interfering with each other using {@link ExchangePattern#InOnly}. + * * @param uri the dynamic endpoint to wiretap to (resolved using simple language by default) * @return the builder */ @@ -2704,7 +2813,7 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> /** * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a> * enriches an exchange with additional data obtained from a <code>resourceUri</code>. - * + * * @param resourceUri URI of resource endpoint for obtaining additional data. * @param aggregationStrategy aggregation strategy to aggregate input data and additional data. * @return the builder @@ -2716,9 +2825,37 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> /** * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a> + * enriches an exchange with additional data obtained from a <code>resourceUri</code>. + * <p/> + * The difference between this and {@link #pollEnrich(String)} is that this uses a producer + * to obatin the additional data, where as pollEnrich uses a polling consumer. + * + * @param resourceUri URI of resource endpoint for obtaining additional data. + * @return the builder + * @see org.apache.camel.processor.Enricher + */ + public Type enrich(@AsEndpointUri EndpointProducerBuilder resourceUri) { + return enrich(resourceUri, null); + } + + /** + * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a> + * enriches an exchange with additional data obtained from a <code>resourceUri</code>. + * + * @param resourceUri URI of resource endpoint for obtaining additional data. + * @param aggregationStrategy aggregation strategy to aggregate input data and additional data. + * @return the builder + * @see org.apache.camel.processor.Enricher + */ + public Type enrich(@AsEndpointUri EndpointProducerBuilder resourceUri, AggregationStrategy aggregationStrategy) { + return enrich(resourceUri, aggregationStrategy, false); + } + + /** + * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a> * enriches an exchange with additional data obtained from a <code>resourceUri</code> and * with an aggregation strategy created using a fluent builder. - * + * * <blockquote><pre>{@code * fom("direct:start") * .enrichWith("direct:resource") @@ -2759,6 +2896,47 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> /** * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a> + * enriches an exchange with additional data obtained from a <code>resourceUri</code> and + * with an aggregation strategy created using a fluent builder. + * + * <blockquote><pre>{@code + * fom("direct:start") + * .enrichWith("direct:resource") + * .body(String.class, (o, n) -> n + o); + * }</pre></blockquote> + * + * @param resourceUri URI of resource endpoint for obtaining additional data. + * @return the builder + * @see org.apache.camel.processor.Enricher + */ + public EnrichClause<ProcessorDefinition<Type>> enrichWith(@AsEndpointUri EndpointProducerBuilder resourceUri) { + return enrichWith(resourceUri.getUri()); + } + + /** + * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a> + * enriches an exchange with additional data obtained from a <code>resourceUri</code> and + * with an aggregation strategy created using a fluent builder. + */ + public EnrichClause<ProcessorDefinition<Type>> enrichWith(@AsEndpointUri EndpointProducerBuilder resourceUri, boolean aggregateOnException) { + EnrichClause<ProcessorDefinition<Type>> clause = new EnrichClause<>(this); + enrich(resourceUri, clause, aggregateOnException, false); + return clause; + } + + /** + * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a> + * enriches an exchange with additional data obtained from a <code>resourceUri</code> and + * with an aggregation strategy created using a fluent builder. + */ + public EnrichClause<ProcessorDefinition<Type>> enrichWith(@AsEndpointUri EndpointProducerBuilder resourceUri, boolean aggregateOnException, boolean shareUnitOfWork) { + EnrichClause<ProcessorDefinition<Type>> clause = new EnrichClause<>(this); + enrich(resourceUri, clause, aggregateOnException, shareUnitOfWork); + return clause; + } + + /** + * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a> * enriches an exchange with additional data obtained from a <code>resourceUri</code>. * * @param resourceUri URI of resource endpoint for obtaining additional data. @@ -2797,6 +2975,43 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> /** * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a> * enriches an exchange with additional data obtained from a <code>resourceUri</code>. + * + * @param resourceUri URI of resource endpoint for obtaining additional data. + * @param aggregationStrategy aggregation strategy to aggregate input data and additional data. + * @param aggregateOnException whether to call {@link AggregationStrategy#aggregate(org.apache.camel.Exchange, org.apache.camel.Exchange)} if + * an exception was thrown. + * @return the builder + * @see org.apache.camel.processor.Enricher + */ + public Type enrich(@AsEndpointUri EndpointProducerBuilder resourceUri, AggregationStrategy aggregationStrategy, boolean aggregateOnException) { + return enrich(resourceUri, aggregationStrategy, aggregateOnException, false); + } + + /** + * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a> + * enriches an exchange with additional data obtained from a <code>resourceUri</code>. + * + * @param resourceUri URI of resource endpoint for obtaining additional data. + * @param aggregationStrategy aggregation strategy to aggregate input data and additional data. + * @param aggregateOnException whether to call {@link AggregationStrategy#aggregate(org.apache.camel.Exchange, org.apache.camel.Exchange)} if + * an exception was thrown. + * @param shareUnitOfWork whether to share unit of work + * @return the builder + * @see org.apache.camel.processor.Enricher + */ + public Type enrich(@AsEndpointUri EndpointProducerBuilder resourceUri, AggregationStrategy aggregationStrategy, boolean aggregateOnException, boolean shareUnitOfWork) { + EnrichDefinition answer = new EnrichDefinition(); + answer.setExpression(resourceUri.expr()); + answer.setAggregationStrategy(aggregationStrategy); + answer.setAggregateOnException(aggregateOnException); + answer.setShareUnitOfWork(shareUnitOfWork); + addOutput(answer); + return asType(); + } + + /** + * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a> + * enriches an exchange with additional data obtained from a <code>resourceUri</code>. * <p/> * The difference between this and {@link #pollEnrich(String)} is that this uses a producer * to obtain the additional data, where as pollEnrich uses a polling consumer. @@ -2896,14 +3111,95 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> /** * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a> + * enriches an exchange with additional data obtained from a <code>resourceUri</code> + * using a {@link org.apache.camel.PollingConsumer} to poll the endpoint. + * <p/> + * The difference between this and {@link #enrich(String)} is that this uses a consumer + * to obtain the additional data, where as enrich uses a producer. + * <p/> + * This method will <tt>block</tt> until data is available, use the method with timeout if you do not + * want to risk waiting a long time before data is available from the resourceUri. + * + * @param resourceUri URI of resource endpoint for obtaining additional data. + * @return the builder + * @see org.apache.camel.processor.PollEnricher + */ + public Type pollEnrich(EndpointProducerBuilder resourceUri) { + return pollEnrich(resourceUri.getUri()); + } + + /** + * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a> + * enriches an exchange with additional data obtained from a <code>resourceUri</code> + * using a {@link org.apache.camel.PollingConsumer} to poll the endpoint. + * <p/> + * The difference between this and {@link #enrich(String)} is that this uses a consumer + * to obtain the additional data, where as enrich uses a producer. + * <p/> + * This method will <b>block</b> until data is available, use the method with timeout if you do not + * want to risk waiting a long time before data is available from the resourceUri. + * + * @param resourceUri URI of resource endpoint for obtaining additional data. + * @param aggregationStrategy aggregation strategy to aggregate input data and additional data. + * @return the builder + * @see org.apache.camel.processor.PollEnricher + */ + public Type pollEnrich(EndpointProducerBuilder resourceUri, AggregationStrategy aggregationStrategy) { + return pollEnrich(resourceUri, -1, aggregationStrategy); + } + + /** + * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a> + * enriches an exchange with additional data obtained from a <code>resourceUri</code> + * using a {@link org.apache.camel.PollingConsumer} to poll the endpoint. + * <p/> + * The difference between this and {@link #enrich(String)} is that this uses a consumer + * to obtain the additional data, where as enrich uses a producer. + * <p/> + * The timeout controls which operation to use on {@link org.apache.camel.PollingConsumer}. + * If timeout is negative, we use <tt>receive</tt>. If timeout is 0 then we use <tt>receiveNoWait</tt> + * otherwise we use <tt>receive(timeout)</tt>. + * + * @param resourceUri URI of resource endpoint for obtaining additional data. + * @param timeout timeout in millis to wait at most for data to be available. + * @param aggregationStrategy aggregation strategy to aggregate input data and additional data. + * @return the builder + * @see org.apache.camel.processor.PollEnricher + */ + public Type pollEnrich(EndpointProducerBuilder resourceUri, long timeout, AggregationStrategy aggregationStrategy) { + return pollEnrich(resourceUri, timeout, aggregationStrategy, false); + } + + /** + * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a> + * enriches an exchange with additional data obtained from a <code>resourceUri</code> + * using a {@link org.apache.camel.PollingConsumer} to poll the endpoint. + * <p/> + * The difference between this and {@link #enrich(String)} is that this uses a consumer + * to obtain the additional data, where as enrich uses a producer. + * <p/> + * The timeout controls which operation to use on {@link org.apache.camel.PollingConsumer}. + * If timeout is negative, we use <tt>receive</tt>. If timeout is 0 then we use <tt>receiveNoWait</tt> + * otherwise we use <tt>receive(timeout)</tt>. + * + * @param resourceUri URI of resource endpoint for obtaining additional data. + * @param timeout timeout in millis to wait at most for data to be available. + * @param aggregationStrategyRef Reference of aggregation strategy to aggregate input data and additional data. + * @return the builder + * @see org.apache.camel.processor.PollEnricher + */ + public Type pollEnrich(EndpointProducerBuilder resourceUri, long timeout, String aggregationStrategyRef) { + return pollEnrich(resourceUri, timeout, aggregationStrategyRef, false); + } + + /** + * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a> * enriches an exchange with additional data obtained from a <code>resourceUri</code> * and with an aggregation strategy created using a fluent builder using * a {@link org.apache.camel.PollingConsumer} to poll the endpoint. */ public EnrichClause<ProcessorDefinition<Type>> pollEnrichWith(@AsEndpointUri String resourceUri) { - EnrichClause<ProcessorDefinition<Type>> clause = new EnrichClause<>(this); - pollEnrich(resourceUri, -1, clause, false); - return clause; + return pollEnrichWith(resourceUri, -1); } /** @@ -2913,9 +3209,7 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> * a {@link org.apache.camel.PollingConsumer} to poll the endpoint. */ public EnrichClause<ProcessorDefinition<Type>> pollEnrichWith(@AsEndpointUri String resourceUri, long timeout) { - EnrichClause<ProcessorDefinition<Type>> clause = new EnrichClause<>(this); - pollEnrich(resourceUri, timeout, clause, false); - return clause; + return pollEnrichWith(resourceUri, timeout, false); } /** @@ -2933,6 +3227,38 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> /** * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a> * enriches an exchange with additional data obtained from a <code>resourceUri</code> + * and with an aggregation strategy created using a fluent builder using + * a {@link org.apache.camel.PollingConsumer} to poll the endpoint. + */ + public EnrichClause<ProcessorDefinition<Type>> pollEnrichWith(EndpointProducerBuilder resourceUri) { + return pollEnrichWith(resourceUri, -1); + } + + /** + * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a> + * enriches an exchange with additional data obtained from a <code>resourceUri</code> + * and with an aggregation strategy created using a fluent builder using + * a {@link org.apache.camel.PollingConsumer} to poll the endpoint. + */ + public EnrichClause<ProcessorDefinition<Type>> pollEnrichWith(EndpointProducerBuilder resourceUri, long timeout) { + return pollEnrichWith(resourceUri, timeout, false); + } + + /** + * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a> + * enriches an exchange with additional data obtained from a <code>resourceUri</code> + * and with an aggregation strategy created using a fluent builder using + * a {@link org.apache.camel.PollingConsumer} to poll the endpoint. + */ + public EnrichClause<ProcessorDefinition<Type>> pollEnrichWith(EndpointProducerBuilder resourceUri, long timeout, boolean aggregateOnException) { + EnrichClause<ProcessorDefinition<Type>> clause = new EnrichClause<>(this); + pollEnrich(resourceUri, timeout, clause, aggregateOnException); + return clause; + } + + /** + * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a> + * enriches an exchange with additional data obtained from a <code>resourceUri</code> * using a {@link org.apache.camel.PollingConsumer} to poll the endpoint. * <p/> * The difference between this and {@link #enrich(String)} is that this uses a consumer @@ -2951,13 +3277,7 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> * @see org.apache.camel.processor.PollEnricher */ public Type pollEnrich(@AsEndpointUri String resourceUri, long timeout, AggregationStrategy aggregationStrategy, boolean aggregateOnException) { - PollEnrichDefinition pollEnrich = new PollEnrichDefinition(); - pollEnrich.setExpression(new ConstantExpression(resourceUri)); - pollEnrich.setTimeout(timeout); - pollEnrich.setAggregationStrategy(aggregationStrategy); - pollEnrich.setAggregateOnException(aggregateOnException); - addOutput(pollEnrich); - return asType(); + return pollEnrich(new ConstantExpression(resourceUri), timeout, aggregationStrategy, aggregateOnException); } /** @@ -2981,13 +3301,7 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> * @see org.apache.camel.processor.PollEnricher */ public Type pollEnrich(@AsEndpointUri String resourceUri, long timeout, String aggregationStrategyRef, boolean aggregateOnException) { - PollEnrichDefinition pollEnrich = new PollEnrichDefinition(); - pollEnrich.setExpression(new ConstantExpression(resourceUri)); - pollEnrich.setTimeout(timeout); - pollEnrich.setAggregationStrategyRef(aggregationStrategyRef); - pollEnrich.setAggregateOnException(aggregateOnException); - addOutput(pollEnrich); - return asType(); + return pollEnrich(new ConstantExpression(resourceUri), timeout, aggregationStrategyRef, aggregateOnException); } /** @@ -3023,6 +3337,75 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> * If timeout is negative, we use <tt>receive</tt>. If timeout is 0 then we use <tt>receiveNoWait</tt> * otherwise we use <tt>receive(timeout)</tt>. * + * @param resourceUri URI of resource endpoint for obtaining additional data. + * @param timeout timeout in millis to wait at most for data to be available. + * @param aggregationStrategy aggregation strategy to aggregate input data and additional data. + * @param aggregateOnException whether to call {@link AggregationStrategy#aggregate(org.apache.camel.Exchange, org.apache.camel.Exchange)} if + * an exception was thrown. + * @return the builder + * @see org.apache.camel.processor.PollEnricher + */ + public Type pollEnrich(@AsEndpointUri EndpointProducerBuilder resourceUri, long timeout, AggregationStrategy aggregationStrategy, boolean aggregateOnException) { + return pollEnrich(resourceUri.expr(), timeout, aggregationStrategy, aggregateOnException); + } + + /** + * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a> + * enriches an exchange with additional data obtained from a <code>resourceUri</code> + * using a {@link org.apache.camel.PollingConsumer} to poll the endpoint. + * <p/> + * The difference between this and {@link #enrich(String)} is that this uses a consumer + * to obtain the additional data, where as enrich uses a producer. + * <p/> + * The timeout controls which operation to use on {@link org.apache.camel.PollingConsumer}. + * If timeout is negative, we use <tt>receive</tt>. If timeout is 0 then we use <tt>receiveNoWait</tt> + * otherwise we use <tt>receive(timeout)</tt>. + * + * @param resourceUri URI of resource endpoint for obtaining additional data. + * @param timeout timeout in millis to wait at most for data to be available. + * @param aggregationStrategyRef Reference of aggregation strategy to aggregate input data and additional data. + * @param aggregateOnException whether to call {@link AggregationStrategy#aggregate(org.apache.camel.Exchange, org.apache.camel.Exchange)} if + * an exception was thrown. + * @return the builder + * @see org.apache.camel.processor.PollEnricher + */ + public Type pollEnrich(@AsEndpointUri EndpointProducerBuilder resourceUri, long timeout, String aggregationStrategyRef, boolean aggregateOnException) { + return pollEnrich(resourceUri.expr(), timeout, aggregationStrategyRef, aggregateOnException); + } + + /** + * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a> + * enriches an exchange with additional data obtained from a <code>resourceUri</code> + * using a {@link org.apache.camel.PollingConsumer} to poll the endpoint. + * <p/> + * The difference between this and {@link #enrich(String)} is that this uses a consumer + * to obtain the additional data, where as enrich uses a producer. + * <p/> + * The timeout controls which operation to use on {@link org.apache.camel.PollingConsumer}. + * If timeout is negative, we use <tt>receive</tt>. If timeout is 0 then we use <tt>receiveNoWait</tt> + * otherwise we use <tt>receive(timeout)</tt>. + * + * @param resourceUri URI of resource endpoint for obtaining additional data. + * @param timeout timeout in millis to wait at most for data to be available. + * @return the builder + * @see org.apache.camel.processor.PollEnricher + */ + public Type pollEnrich(@AsEndpointUri EndpointProducerBuilder resourceUri, long timeout) { + return pollEnrich(resourceUri, timeout, (String) null); + } + + /** + * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a> + * enriches an exchange with additional data obtained from a <code>resourceUri</code> + * using a {@link org.apache.camel.PollingConsumer} to poll the endpoint. + * <p/> + * The difference between this and {@link #enrich(String)} is that this uses a consumer + * to obtain the additional data, where as enrich uses a producer. + * <p/> + * The timeout controls which operation to use on {@link org.apache.camel.PollingConsumer}. + * If timeout is negative, we use <tt>receive</tt>. If timeout is 0 then we use <tt>receiveNoWait</tt> + * otherwise we use <tt>receive(timeout)</tt>. + * * @param expression to use an expression to dynamically compute the endpoint to poll from * @param timeout timeout in millis to wait at most for data to be available. * @param aggregationStrategyRef Reference of aggregation strategy to aggregate input data and additional data. @@ -3033,7 +3416,7 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> */ public Type pollEnrich(@AsEndpointUri Expression expression, long timeout, String aggregationStrategyRef, boolean aggregateOnException) { PollEnrichDefinition pollEnrich = new PollEnrichDefinition(); - pollEnrich.setExpression(new ExpressionDefinition(expression)); + pollEnrich.setExpression(expression); pollEnrich.setTimeout(timeout); pollEnrich.setAggregationStrategyRef(aggregationStrategyRef); pollEnrich.setAggregateOnException(aggregateOnException); @@ -3053,6 +3436,36 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> * If timeout is negative, we use <tt>receive</tt>. If timeout is 0 then we use <tt>receiveNoWait</tt> * otherwise we use <tt>receive(timeout)</tt>. * + * @param expression to use an expression to dynamically compute the endpoint to poll from + * @param timeout timeout in millis to wait at most for data to be available. + * @param aggregationStrategy aggregation strategy to aggregate input data and additional data. + * @param aggregateOnException whether to call {@link AggregationStrategy#aggregate(org.apache.camel.Exchange, org.apache.camel.Exchange)} if + * an exception was thrown. + * @return the builder + * @see org.apache.camel.processor.PollEnricher + */ + public Type pollEnrich(@AsEndpointUri Expression expression, long timeout, AggregationStrategy aggregationStrategy, boolean aggregateOnException) { + PollEnrichDefinition pollEnrich = new PollEnrichDefinition(); + pollEnrich.setExpression(expression); + pollEnrich.setTimeout(timeout); + pollEnrich.setAggregationStrategy(aggregationStrategy); + pollEnrich.setAggregateOnException(aggregateOnException); + addOutput(pollEnrich); + return asType(); + } + + /** + * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a> + * enriches an exchange with additional data obtained from a <code>resourceUri</code> + * using a {@link org.apache.camel.PollingConsumer} to poll the endpoint. + * <p/> + * The difference between this and {@link #enrich(String)} is that this uses a consumer + * to obtain the additional data, where as enrich uses a producer. + * <p/> + * The timeout controls which operation to use on {@link org.apache.camel.PollingConsumer}. + * If timeout is negative, we use <tt>receive</tt>. If timeout is 0 then we use <tt>receiveNoWait</tt> + * otherwise we use <tt>receive(timeout)</tt>. + * * @return a expression builder clause to set the expression to use for computing the endpoint to poll from * @see org.apache.camel.processor.PollEnricher */ diff --git a/core/camel-core/src/main/java/org/apache/camel/model/RouteDefinition.java b/core/camel-core/src/main/java/org/apache/camel/model/RouteDefinition.java index 4489e25..338cf4d 100644 --- a/core/camel-core/src/main/java/org/apache/camel/model/RouteDefinition.java +++ b/core/camel-core/src/main/java/org/apache/camel/model/RouteDefinition.java @@ -169,6 +169,12 @@ public class RouteDefinition extends ProcessorDefinition<RouteDefinition> { return this; } + /** + * Creates an input to the route + * + * @param endpoint the from endpoint + * @return the builder + */ public RouteDefinition from(EndpointConsumerBuilder endpoint) { setInput(new FromDefinition(endpoint)); return this; diff --git a/core/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java b/core/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java index 2a7ee92..78828ad 100644 --- a/core/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java +++ b/core/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java @@ -61,6 +61,11 @@ public class ToDefinition extends SendDefinition<ToDefinition> { this.pattern = pattern; } + public ToDefinition(EndpointProducerBuilder endpoint, ExchangePattern pattern) { + this(endpoint); + this.pattern = pattern; + } + @Override public String getShortName() { return "to"; diff --git a/core/camel-core/src/main/java/org/apache/camel/model/ToDynamicDefinition.java b/core/camel-core/src/main/java/org/apache/camel/model/ToDynamicDefinition.java index 6f3c2ce..165376e 100644 --- a/core/camel-core/src/main/java/org/apache/camel/model/ToDynamicDefinition.java +++ b/core/camel-core/src/main/java/org/apache/camel/model/ToDynamicDefinition.java @@ -20,8 +20,10 @@ import javax.xml.bind.annotation.XmlAccessType; import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlAttribute; import javax.xml.bind.annotation.XmlRootElement; +import javax.xml.bind.annotation.XmlTransient; import org.apache.camel.ExchangePattern; +import org.apache.camel.builder.EndpointProducerBuilder; import org.apache.camel.spi.Metadata; /** @@ -40,6 +42,8 @@ public class ToDynamicDefinition extends NoOutputDefinition<ToDynamicDefinition> @XmlAttribute @Metadata(required = true) private String uri; + @XmlTransient + protected EndpointProducerBuilder endpointProducerBuilder; @XmlAttribute private ExchangePattern pattern; @XmlAttribute @@ -122,6 +126,14 @@ public class ToDynamicDefinition extends NoOutputDefinition<ToDynamicDefinition> this.uri = uri; } + public EndpointProducerBuilder getEndpointProducerBuilder() { + return endpointProducerBuilder; + } + + public void setEndpointProducerBuilder(EndpointProducerBuilder endpointProducerBuilder) { + this.endpointProducerBuilder = endpointProducerBuilder; + } + public ExchangePattern getPattern() { return pattern; } diff --git a/core/camel-core/src/main/java/org/apache/camel/reifier/ToDynamicReifier.java b/core/camel-core/src/main/java/org/apache/camel/reifier/ToDynamicReifier.java index 0b1d264..042fede 100644 --- a/core/camel-core/src/main/java/org/apache/camel/reifier/ToDynamicReifier.java +++ b/core/camel-core/src/main/java/org/apache/camel/reifier/ToDynamicReifier.java @@ -40,11 +40,17 @@ class ToDynamicReifier<T extends ToDynamicDefinition> extends ProcessorReifier<T @Override public Processor createProcessor(RouteContext routeContext) throws Exception { - StringHelper.notEmpty(definition.getUri(), "uri", this); - - Expression exp = createExpression(routeContext); + String uri; + Expression exp; + if (definition.getEndpointProducerBuilder() != null) { + uri = definition.getEndpointProducerBuilder().getUri(); + exp = definition.getEndpointProducerBuilder().expr(); + } else { + uri = StringHelper.notEmpty(definition.getUri(), "uri", this); + exp = createExpression(routeContext, uri); + } - SendDynamicProcessor processor = new SendDynamicProcessor(definition.getUri(), exp); + SendDynamicProcessor processor = new SendDynamicProcessor(uri, exp); processor.setCamelContext(routeContext.getCamelContext()); processor.setPattern(definition.getPattern()); if (definition.getCacheSize() != null) { @@ -56,10 +62,10 @@ class ToDynamicReifier<T extends ToDynamicDefinition> extends ProcessorReifier<T return processor; } - protected Expression createExpression(RouteContext routeContext) { + protected Expression createExpression(RouteContext routeContext, String uri) { List<Expression> list = new ArrayList<>(); - String[] parts = safeSplitRaw(definition.getUri()); + String[] parts = safeSplitRaw(uri); for (String part : parts) { // the part may have optional language to use, so you can mix languages String value = StringHelper.after(part, "language:"); diff --git a/core/camel-core/src/main/java/org/apache/camel/reifier/WireTapReifier.java b/core/camel-core/src/main/java/org/apache/camel/reifier/WireTapReifier.java index b889785..7b0f412 100644 --- a/core/camel-core/src/main/java/org/apache/camel/reifier/WireTapReifier.java +++ b/core/camel-core/src/main/java/org/apache/camel/reifier/WireTapReifier.java @@ -88,12 +88,12 @@ class WireTapReifier extends ToDynamicReifier<WireTapDefinition<?>> { } @Override - protected Expression createExpression(RouteContext routeContext) { + protected Expression createExpression(RouteContext routeContext, String uri) { // whether to use dynamic or static uri if (definition.isDynamic()) { - return super.createExpression(routeContext); + return super.createExpression(routeContext, uri); } else { - return ExpressionBuilder.constantExpression(definition.getUri()); + return ExpressionBuilder.constantExpression(uri); } } diff --git a/core/camel-endpointdsl/pom.xml b/core/camel-endpointdsl/pom.xml index 63f9b65..ed085c3 100644 --- a/core/camel-endpointdsl/pom.xml +++ b/core/camel-endpointdsl/pom.xml @@ -61,6 +61,10 @@ <artifactId>camel-util-json</artifactId> </dependency> <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-jaxp</artifactId> + </dependency> + <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </dependency> @@ -109,6 +113,16 @@ <artifactId>mockito-core</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-core</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-slf4j-impl</artifactId> + <scope>test</scope> + </dependency> </dependencies> diff --git a/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/AbstractEndpointBuilder.java b/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/AbstractEndpointBuilder.java index 518c756..b243ac1 100644 --- a/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/AbstractEndpointBuilder.java +++ b/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/AbstractEndpointBuilder.java @@ -25,9 +25,13 @@ import javax.xml.bind.annotation.XmlTransient; import org.apache.camel.CamelContext; import org.apache.camel.Endpoint; +import org.apache.camel.Exchange; +import org.apache.camel.Expression; import org.apache.camel.NoSuchEndpointException; import org.apache.camel.RuntimeCamelException; +import org.apache.camel.model.language.SimpleExpression; import org.apache.camel.spi.RouteContext; +import org.apache.camel.support.ExpressionAdapter; import org.apache.camel.util.URISupport; @XmlTransient @@ -49,7 +53,7 @@ public class AbstractEndpointBuilder { if (endpoint == null) { throw new NoSuchEndpointException(uri); } - return null; + return endpoint; } public String getUri() { @@ -89,4 +93,12 @@ public class AbstractEndpointBuilder { public void setProperty(String key, Object value) { this.properties.put(key, value); } + + /** + * Builds an expression that can be used + * @return + */ + public Expression expr() { + return new SimpleExpression(getUri()); + } } diff --git a/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/EndpointBuilderFactory.java b/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/EndpointBuilderFactory.java index 9faa1f4..9d3ec10 100644 --- a/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/EndpointBuilderFactory.java +++ b/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/EndpointBuilderFactory.java @@ -17,7 +17,15 @@ package org.apache.camel.builder.endpoint; // CHECKSTYLE:OFF +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.apache.camel.Exchange; +import org.apache.camel.Expression; +import org.apache.camel.builder.EndpointProducerBuilder; import org.apache.camel.builder.endpoint.dsl.*; +import org.apache.camel.support.ExpressionAdapter; public interface EndpointBuilderFactory extends AMQPEndpointBuilderFactory, @@ -317,4 +325,17 @@ public interface EndpointBuilderFactory extends ZooKeeperEndpointBuilderFactory, ZooKeeperMasterEndpointBuilderFactory { + + default Expression endpoints(EndpointProducerBuilder... endpoints) { + return new ExpressionAdapter() { + List<Expression> expressions = Stream.of(endpoints) + .map(EndpointProducerBuilder::expr).collect(Collectors.toList()); + @Override + public Object evaluate(Exchange exchange) { + return expressions.stream().map(e -> e.evaluate(exchange, Object.class)) + .collect(Collectors.toList()); + } + }; + } + } diff --git a/core/camel-endpointdsl/src/test/java/org/apache/camel/builder/endpoint/ProcessorDefinitionDslTest.java b/core/camel-endpointdsl/src/test/java/org/apache/camel/builder/endpoint/ProcessorDefinitionDslTest.java new file mode 100644 index 0000000..34fa6cb --- /dev/null +++ b/core/camel-endpointdsl/src/test/java/org/apache/camel/builder/endpoint/ProcessorDefinitionDslTest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.builder.endpoint; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.ResolveEndpointFailedException; +import org.apache.camel.builder.EndpointProducerBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.junit.Test; + +public class ProcessorDefinitionDslTest extends ContextTestSupport { + + @Test + public void testFlow() throws Exception { + MockEndpoint m1 = getMockEndpoint("mock:m1"); + m1.expectedMessageCount(1); + + MockEndpoint m2 = getMockEndpoint("mock:m2"); + m2.expectedMessageCount(1); + + MockEndpoint m3 = getMockEndpoint("mock:m3"); + m3.expectedMessageCount(1); + + template.requestBody("direct:a", "Hello World", String.class); + + assertMockEndpointsSatisfied(); + } + + protected RouteBuilder createRouteBuilder() { + return new EndpointRouteBuilder() { + public void configure() { + from(direct("a")) + .recipientList(endpoints(mock("m1"), direct("b"))) + .routingSlip(endpoints(mock("m2"), direct("c"))) + .wireTap(direct("d")) + .enrich(direct("e")) + .toD(mock("${header.next}")); + + from(direct("b")).to(log("endpoint.b")); + from(direct("c")).to(log("endpoint.c")); + from(direct("d")).to(log("endpoint.d")); + from(direct("e")) + .setBody(constant("body")) + .setHeader("next", constant("m3")); + + + } + }; + } + + +} diff --git a/core/camel-endpointdsl/src/test/resources/log4j2.properties b/core/camel-endpointdsl/src/test/resources/log4j2.properties new file mode 100644 index 0000000..a5b53e0 --- /dev/null +++ b/core/camel-endpointdsl/src/test/resources/log4j2.properties @@ -0,0 +1,49 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- +appender.console.type = Console +appender.console.name = console +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m%n + +appender.file.type = File +appender.file.name = file +appender.file.fileName = target/camel-core-test.log +appender.file.append = true +appender.file.layout.type = PatternLayout +appender.file.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m%n + +appender.file2.type = File +appender.file2.name = file2 +appender.file2.fileName = target/custom-logger-test.log +appender.file2.append = false +appender.file2.layout.type = PatternLayout +appender.file2.layout.pattern = %-5p %c{1} %m%n + +logger.customlogger.name = org.apache.camel.customlogger +logger.customlogger.level = TRACE +logger.customlogger.appenderRef.file2.ref = file2 + +logger.file-cluster.name = org.apache.camel.component.file.cluster +logger.file-cluster.level = DEBUG + +rootLogger.level = INFO + +rootLogger.appenderRef.file.ref = file +#rootLogger.appenderRef.console.ref = console + +#logger.camel-core.name = org.apache.camel +#logger.camel-core.level = TRACE