Repository: camel Updated Branches: refs/heads/master e4b1a5213 -> 40fa7f865
CAMEL-10438: Java8 DSL for Content Enricher and Aggregator Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/40fa7f86 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/40fa7f86 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/40fa7f86 Branch: refs/heads/master Commit: 40fa7f86535261a2e685b69ca14c5920ff4e32df Parents: e4b1a52 Author: lburgazzoli <[email protected]> Authored: Fri Nov 4 13:33:06 2016 +0100 Committer: lburgazzoli <[email protected]> Committed: Tue Nov 8 11:02:57 2016 +0100 ---------------------------------------------------------------------- .../builder/AggregationStrategyClause.java | 116 +++++++++++++++++++ .../org/apache/camel/builder/EnrichClause.java | 25 ++++ .../apache/camel/builder/PredicateClause.java | 114 ++++++++++++++++++ .../apache/camel/model/AggregateDefinition.java | 46 ++++++++ .../apache/camel/model/ProcessorDefinition.java | 74 ++++++++++++ .../processor/aggregator/AggregateDslTest.java | 57 +++++++++ .../processor/enricher/EnricherDslTest.java | 53 +++++++++ 7 files changed, 485 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/40fa7f86/camel-core/src/main/java/org/apache/camel/builder/AggregationStrategyClause.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/builder/AggregationStrategyClause.java b/camel-core/src/main/java/org/apache/camel/builder/AggregationStrategyClause.java new file mode 100644 index 0000000..c72f904 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/builder/AggregationStrategyClause.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.builder; + +import java.util.function.BiFunction; + +import org.apache.camel.Exchange; +import org.apache.camel.Message; +import org.apache.camel.processor.aggregate.AggregationStrategy; +import org.apache.camel.util.ObjectHelper; + +public class AggregationStrategyClause<T> implements AggregationStrategy { + private final T parent; + private AggregationStrategy strategy; + + public AggregationStrategyClause(T parent) { + this.parent = parent; + this.strategy = null; + } + + @Override + public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { + return ObjectHelper.notNull(strategy, "AggregationStrategy").aggregate(oldExchange, newExchange); + } + + // ******************************* + // Exchange + // ******************************* + + /** + * TODO: document + * + * Note: this is experimental and subject to changes in future releases. + */ + public T exchange(final BiFunction<Exchange, Exchange, Exchange> function) { + strategy = function::apply; + return parent; + } + + // ******************************* + // Message + // ******************************* + + /** + * TODO: document + * + * Note: this is experimental and subject to changes in future releases. + */ + public T message(final BiFunction<Message, Message, Message> function) { + return exchange((Exchange oldExchange, Exchange newExchange) -> { + Message oldMessage = oldExchange != null ? oldExchange.getIn() : null; + Message newMessage = ObjectHelper.notNull(newExchange, "NewExchange").getIn(); + Message result = function.apply(oldMessage, newMessage); + + if (oldExchange != null) { + oldExchange.setIn(result); + return oldExchange; + } else { + newExchange.setIn(result); + return newExchange; + } + }); + } + + // ******************************* + // Body + // ******************************* + + /** + * TODO: document + * + * Note: this is experimental and subject to changes in future releases. + */ + public T body(final BiFunction<Object, Object, Object> function) { + return body(Object.class, function); + } + + /** + * TODO: document + * + * Note: this is experimental and subject to changes in future releases. + */ + public <B> T body(final Class<B> type, final BiFunction<B, B, B> function) { + return exchange((Exchange oldExchange, Exchange newExchange) -> { + Message oldMessage = oldExchange != null ? oldExchange.getIn() : null; + Message newMessage = ObjectHelper.notNull(newExchange, "NewExchange").getIn(); + + B result = function.apply( + oldMessage != null ? oldMessage.getBody(type) : null, + newMessage != null ? newMessage.getBody(type) : null); + + if (oldExchange != null) { + oldExchange.getIn().setBody(result); + return oldExchange; + } else { + newExchange.getIn().setBody(result); + return newExchange; + } + }); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/40fa7f86/camel-core/src/main/java/org/apache/camel/builder/EnrichClause.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/builder/EnrichClause.java b/camel-core/src/main/java/org/apache/camel/builder/EnrichClause.java new file mode 100644 index 0000000..bc1f78b --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/builder/EnrichClause.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.builder; + +import org.apache.camel.model.ProcessorDefinition; + +public class EnrichClause<T extends ProcessorDefinition<?>> extends AggregationStrategyClause<T> { + public EnrichClause(T parent) { + super(parent); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/40fa7f86/camel-core/src/main/java/org/apache/camel/builder/PredicateClause.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/builder/PredicateClause.java b/camel-core/src/main/java/org/apache/camel/builder/PredicateClause.java new file mode 100644 index 0000000..d44d2b3 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/builder/PredicateClause.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.builder; + +import java.util.Map; +import java.util.function.BiPredicate; +import java.util.function.Predicate; + +import org.apache.camel.Exchange; +import org.apache.camel.Message; + +public class PredicateClause<T> implements org.apache.camel.Predicate { + private final T parent; + private Predicate<Exchange> predicate; + + public PredicateClause(T parent) { + this.parent = parent; + this.predicate = null; + } + + @Override + public boolean matches(Exchange exchange) { + return (predicate != null) ? predicate.test(exchange) : false; + } + + // ******************************* + // Exchange + // ******************************* + + /** + * TODO: document + * + * Note: this is experimental and subject to changes in future releases. + */ + public T exchange(final Predicate<Exchange> predicate) { + this.predicate = predicate::test; + return parent; + } + + + // ******************************* + // Message + // ******************************* + + /** + * TODO: document + * + * Note: this is experimental and subject to changes in future releases. + */ + public T message(final Predicate<Message> predicate) { + return exchange(e -> predicate.test(e.getIn())); + } + + // ******************************* + // Body + // ******************************* + + /** + * TODO: document + * + * Note: this is experimental and subject to changes in future releases. + */ + public T body(final Predicate<Object> predicate) { + return exchange(e -> predicate.test(e.getIn().getBody())); + } + + /** + * TODO: document + * + * Note: this is experimental and subject to changes in future releases. + */ + public <B> T body(final Class<B> type, final Predicate<B> predicate) { + return exchange(e -> predicate.test(e.getIn().getBody(type))); + } + + /** + * TODO: document + * + * Note: this is experimental and subject to changes in future releases. + */ + public T body(final BiPredicate<Object, Map<String, Object>> predicate) { + return exchange(e -> predicate.test( + e.getIn().getBody(), + e.getIn().getHeaders()) + ); + } + + /** + * TODO: document + * + * Note: this is experimental and subject to changes in future releases. + */ + public <B> T body(final Class<B> type, final BiPredicate<B, Map<String, Object>> predicate) { + return exchange(e -> predicate.test( + e.getIn().getBody(type), + e.getIn().getHeaders()) + ); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/40fa7f86/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java index e14118e..2b5b97c 100644 --- a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java @@ -32,7 +32,9 @@ import org.apache.camel.CamelContextAware; import org.apache.camel.Expression; import org.apache.camel.Predicate; import org.apache.camel.Processor; +import org.apache.camel.builder.AggregationStrategyClause; import org.apache.camel.builder.ExpressionClause; +import org.apache.camel.builder.PredicateClause; import org.apache.camel.model.language.ExpressionDefinition; import org.apache.camel.processor.CamelInternalProcessor; import org.apache.camel.processor.aggregate.AggregateController; @@ -780,6 +782,28 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition } /** + * TODO: document + * Note: this is experimental and subject to changes in future releases. + * + * @return the builder + */ + public AggregationStrategyClause<AggregateDefinition> aggregationStrategy() { + AggregationStrategyClause<AggregateDefinition> clause = new AggregationStrategyClause<>(this); + setAggregationStrategy(clause); + return clause; + } + + /** + * TODO: document + * Note: this is experimental and subject to changes in future releases. + * + * @return the builder + */ + public AggregationStrategyClause<AggregateDefinition> strategy() { + return aggregationStrategy(); + } + + /** * Sets the aggregate strategy to use * * @param aggregationStrategy the aggregate strategy to use @@ -872,6 +896,28 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition } /** + * TODO: document + * Note: this is experimental and subject to changes in future releases. + * + * @return the builder + */ + public PredicateClause<AggregateDefinition> completionPredicate() { + PredicateClause<AggregateDefinition> clause = new PredicateClause<>(this); + completionPredicate(clause); + return clause; + } + + /** + * TODO: document + * Note: this is experimental and subject to changes in future releases. + * + * @return the builder + */ + public PredicateClause<AggregateDefinition> completion() { + return completionPredicate(); + } + + /** * Indicates to complete all current aggregated exchanges when the context is stopped */ public AggregateDefinition forceCompletionOnStop() { http://git-wip-us.apache.org/repos/asf/camel/blob/40fa7f86/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java index 2ce1fc6..3c04f4b 100644 --- a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java @@ -46,6 +46,7 @@ import org.apache.camel.Predicate; import org.apache.camel.Processor; import org.apache.camel.Route; import org.apache.camel.builder.DataFormatClause; +import org.apache.camel.builder.EnrichClause; import org.apache.camel.builder.ExpressionBuilder; import org.apache.camel.builder.ExpressionClause; import org.apache.camel.builder.ProcessClause; @@ -3319,6 +3320,42 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> } /** + * TODO: document + * Note: this is experimental and subject to changes in future releases. + * + * @return the builder + */ + public EnrichClause<ProcessorDefinition<Type>> enrichWith(String resourceUri) { + EnrichClause<ProcessorDefinition<Type>> clause = new EnrichClause<>(this); + enrich(resourceUri, clause); + return clause; + } + + /** + * TODO: document + * Note: this is experimental and subject to changes in future releases. + * + * @return the builder + */ + public EnrichClause<ProcessorDefinition<Type>> enrichWith(String resourceUri, boolean aggregateOnException) { + EnrichClause<ProcessorDefinition<Type>> clause = new EnrichClause<>(this); + enrich(resourceUri, clause, aggregateOnException, false); + return clause; + } + + /** + * TODO: document + * Note: this is experimental and subject to changes in future releases. + * + * @return the builder + */ + public EnrichClause<ProcessorDefinition<Type>> enrichWith(String resourceUri, boolean aggregateOnException, boolean shareUnitOfWork) { + EnrichClause<ProcessorDefinition<Type>> clause = new EnrichClause<>(this); + enrich(resourceUri, clause, aggregateOnException, shareUnitOfWork); + return clause; + } + + /** * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a> * enriches an exchange with additional data obtained from a <code>resourceUri</code>. * @@ -3525,6 +3562,43 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> return pollEnrich(resourceUri, timeout, aggregationStrategyRef, false); } + + /** + * TODO: document + * Note: this is experimental and subject to changes in future releases. + * + * @return the builder + */ + public EnrichClause<ProcessorDefinition<Type>> pollEnrichWith(String resourceUri) { + EnrichClause<ProcessorDefinition<Type>> clause = new EnrichClause<>(this); + pollEnrich(resourceUri, -1, clause, false); + return clause; + } + + /** + * TODO: document + * Note: this is experimental and subject to changes in future releases. + * + * @return the builder + */ + public EnrichClause<ProcessorDefinition<Type>> pollEnrichWith(String resourceUri, long timeout) { + EnrichClause<ProcessorDefinition<Type>> clause = new EnrichClause<>(this); + pollEnrich(resourceUri, timeout, clause, false); + return clause; + } + + /** + * TODO: document + * Note: this is experimental and subject to changes in future releases. + * + * @return the builder + */ + public EnrichClause<ProcessorDefinition<Type>> pollEnrichWith(String resourceUri, long timeout, boolean aggregateOnException) { + EnrichClause<ProcessorDefinition<Type>> clause = new EnrichClause<>(this); + pollEnrich(resourceUri, timeout, clause, aggregateOnException); + return clause; + } + /** * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a> * enriches an exchange with additional data obtained from a <code>resourceUri</code> http://git-wip-us.apache.org/repos/asf/camel/blob/40fa7f86/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDslTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDslTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDslTest.java new file mode 100644 index 0000000..55fd14e --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDslTest.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.aggregator; + +import java.util.Objects; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; + +public class AggregateDslTest extends ContextTestSupport { + + public void testAggregate() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:aggregated"); + mock.expectedBodiesReceived("0,3,6", "1,4,7", "2,5,8"); + + for (int i = 0; i < 9; i++) { + template.sendBodyAndHeader("direct:start", i, "type", i % 3); + } + + mock.assertIsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start") + .aggregate() + .message(m -> m.getHeader("type")) + .strategy() + .body(String.class, (o, n) -> Stream.of(o, n).filter(Objects::nonNull).collect(Collectors.joining(","))) + .completion() + .body(String.class, s -> s.length() == 5) + .to("mock:aggregated"); + } + }; + } +} + http://git-wip-us.apache.org/repos/asf/camel/blob/40fa7f86/camel-core/src/test/java/org/apache/camel/processor/enricher/EnricherDslTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/enricher/EnricherDslTest.java b/camel-core/src/test/java/org/apache/camel/processor/enricher/EnricherDslTest.java new file mode 100644 index 0000000..ca7d8fd --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/enricher/EnricherDslTest.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.enricher; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; + +public class EnricherDslTest extends ContextTestSupport { + + public void testEnrich() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:enriched"); + mock.expectedBodiesReceived("res-1", "res-2", "res-3"); + + template.sendBody("direct:start", "1"); + template.sendBody("direct:start", "2"); + template.sendBody("direct:start", "3"); + + mock.assertIsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start") + .enrichWith("direct:resource") + .body(String.class, (o, n) -> n + o) + .to("mock:enriched"); + + // set an empty message + from("direct:resource") + .transform() + .body(b -> "res-"); + } + }; + } +}
