Repository: camel
Updated Branches:
  refs/heads/master e4b1a5213 -> 40fa7f865


CAMEL-10438: Java8 DSL for Content Enricher and Aggregator


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/40fa7f86
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/40fa7f86
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/40fa7f86

Branch: refs/heads/master
Commit: 40fa7f86535261a2e685b69ca14c5920ff4e32df
Parents: e4b1a52
Author: lburgazzoli <[email protected]>
Authored: Fri Nov 4 13:33:06 2016 +0100
Committer: lburgazzoli <[email protected]>
Committed: Tue Nov 8 11:02:57 2016 +0100

----------------------------------------------------------------------
 .../builder/AggregationStrategyClause.java      | 116 +++++++++++++++++++
 .../org/apache/camel/builder/EnrichClause.java  |  25 ++++
 .../apache/camel/builder/PredicateClause.java   | 114 ++++++++++++++++++
 .../apache/camel/model/AggregateDefinition.java |  46 ++++++++
 .../apache/camel/model/ProcessorDefinition.java |  74 ++++++++++++
 .../processor/aggregator/AggregateDslTest.java  |  57 +++++++++
 .../processor/enricher/EnricherDslTest.java     |  53 +++++++++
 7 files changed, 485 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/40fa7f86/camel-core/src/main/java/org/apache/camel/builder/AggregationStrategyClause.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/builder/AggregationStrategyClause.java
 
b/camel-core/src/main/java/org/apache/camel/builder/AggregationStrategyClause.java
new file mode 100644
index 0000000..c72f904
--- /dev/null
+++ 
b/camel-core/src/main/java/org/apache/camel/builder/AggregationStrategyClause.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.builder;
+
+import java.util.function.BiFunction;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.util.ObjectHelper;
+
+public class AggregationStrategyClause<T> implements AggregationStrategy {
+    private final T parent;
+    private AggregationStrategy strategy;
+
+    public AggregationStrategyClause(T parent) {
+        this.parent = parent;
+        this.strategy = null;
+    }
+
+    @Override
+    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+        return ObjectHelper.notNull(strategy, 
"AggregationStrategy").aggregate(oldExchange, newExchange);
+    }
+
+    // *******************************
+    // Exchange
+    // *******************************
+
+    /**
+     * TODO: document
+     *
+     * Note: this is experimental and subject to changes in future releases.
+     */
+    public T exchange(final BiFunction<Exchange, Exchange, Exchange> function) 
{
+        strategy = function::apply;
+        return parent;
+    }
+
+    // *******************************
+    // Message
+    // *******************************
+
+    /**
+     * TODO: document
+     *
+     * Note: this is experimental and subject to changes in future releases.
+     */
+    public T message(final BiFunction<Message, Message, Message> function) {
+        return exchange((Exchange oldExchange, Exchange newExchange) -> {
+            Message oldMessage = oldExchange != null ? oldExchange.getIn() : 
null;
+            Message newMessage = ObjectHelper.notNull(newExchange, 
"NewExchange").getIn();
+            Message result = function.apply(oldMessage, newMessage);
+
+            if (oldExchange != null) {
+                oldExchange.setIn(result);
+                return oldExchange;
+            } else {
+                newExchange.setIn(result);
+                return newExchange;
+            }
+        });
+    }
+
+    // *******************************
+    // Body
+    // *******************************
+
+    /**
+     * TODO: document
+     *
+     * Note: this is experimental and subject to changes in future releases.
+     */
+    public T body(final BiFunction<Object, Object, Object> function) {
+        return body(Object.class, function);
+    }
+
+    /**
+     * TODO: document
+     *
+     * Note: this is experimental and subject to changes in future releases.
+     */
+    public <B> T body(final Class<B> type, final BiFunction<B, B, B> function) 
{
+        return exchange((Exchange oldExchange, Exchange newExchange) -> {
+            Message oldMessage = oldExchange != null ? oldExchange.getIn() : 
null;
+            Message newMessage = ObjectHelper.notNull(newExchange, 
"NewExchange").getIn();
+
+            B result = function.apply(
+                oldMessage != null ? oldMessage.getBody(type) : null,
+                newMessage != null ? newMessage.getBody(type) : null);
+
+            if (oldExchange != null) {
+                oldExchange.getIn().setBody(result);
+                return oldExchange;
+            } else {
+                newExchange.getIn().setBody(result);
+                return newExchange;
+            }
+        });
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/40fa7f86/camel-core/src/main/java/org/apache/camel/builder/EnrichClause.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/builder/EnrichClause.java 
b/camel-core/src/main/java/org/apache/camel/builder/EnrichClause.java
new file mode 100644
index 0000000..bc1f78b
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/builder/EnrichClause.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.builder;
+
+import org.apache.camel.model.ProcessorDefinition;
+
+public class EnrichClause<T extends ProcessorDefinition<?>> extends 
AggregationStrategyClause<T> {
+    public EnrichClause(T parent) {
+        super(parent);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/40fa7f86/camel-core/src/main/java/org/apache/camel/builder/PredicateClause.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/builder/PredicateClause.java 
b/camel-core/src/main/java/org/apache/camel/builder/PredicateClause.java
new file mode 100644
index 0000000..d44d2b3
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/builder/PredicateClause.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.builder;
+
+import java.util.Map;
+import java.util.function.BiPredicate;
+import java.util.function.Predicate;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+
+public class PredicateClause<T> implements org.apache.camel.Predicate {
+    private final T parent;
+    private Predicate<Exchange> predicate;
+
+    public PredicateClause(T parent) {
+        this.parent = parent;
+        this.predicate = null;
+    }
+
+    @Override
+    public boolean matches(Exchange exchange) {
+        return (predicate != null) ?  predicate.test(exchange) : false;
+    }
+
+    // *******************************
+    // Exchange
+    // *******************************
+
+    /**
+     * TODO: document
+     *
+     * Note: this is experimental and subject to changes in future releases.
+     */
+    public T exchange(final Predicate<Exchange> predicate) {
+        this.predicate = predicate::test;
+        return parent;
+    }
+
+
+    // *******************************
+    // Message
+    // *******************************
+
+    /**
+     * TODO: document
+     *
+     * Note: this is experimental and subject to changes in future releases.
+     */
+    public T message(final Predicate<Message> predicate) {
+        return exchange(e -> predicate.test(e.getIn()));
+    }
+
+    // *******************************
+    // Body
+    // *******************************
+
+    /**
+     * TODO: document
+     *
+     * Note: this is experimental and subject to changes in future releases.
+     */
+    public T body(final Predicate<Object> predicate) {
+        return exchange(e -> predicate.test(e.getIn().getBody()));
+    }
+
+    /**
+     * TODO: document
+     *
+     * Note: this is experimental and subject to changes in future releases.
+     */
+    public <B> T body(final Class<B> type, final Predicate<B> predicate) {
+        return exchange(e -> predicate.test(e.getIn().getBody(type)));
+    }
+
+    /**
+     * TODO: document
+     *
+     * Note: this is experimental and subject to changes in future releases.
+     */
+    public T body(final BiPredicate<Object, Map<String, Object>> predicate) {
+        return exchange(e -> predicate.test(
+            e.getIn().getBody(),
+            e.getIn().getHeaders())
+        );
+    }
+
+    /**
+     * TODO: document
+     *
+     * Note: this is experimental and subject to changes in future releases.
+     */
+    public <B> T body(final Class<B> type, final BiPredicate<B, Map<String, 
Object>> predicate) {
+        return exchange(e -> predicate.test(
+            e.getIn().getBody(type),
+            e.getIn().getHeaders())
+        );
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/40fa7f86/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java 
b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
index e14118e..2b5b97c 100644
--- a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
@@ -32,7 +32,9 @@ import org.apache.camel.CamelContextAware;
 import org.apache.camel.Expression;
 import org.apache.camel.Predicate;
 import org.apache.camel.Processor;
+import org.apache.camel.builder.AggregationStrategyClause;
 import org.apache.camel.builder.ExpressionClause;
+import org.apache.camel.builder.PredicateClause;
 import org.apache.camel.model.language.ExpressionDefinition;
 import org.apache.camel.processor.CamelInternalProcessor;
 import org.apache.camel.processor.aggregate.AggregateController;
@@ -780,6 +782,28 @@ public class AggregateDefinition extends 
ProcessorDefinition<AggregateDefinition
     }
 
     /**
+     * TODO: document
+     * Note: this is experimental and subject to changes in future releases.
+     *
+     * @return the builder
+     */
+    public AggregationStrategyClause<AggregateDefinition> 
aggregationStrategy() {
+        AggregationStrategyClause<AggregateDefinition> clause = new 
AggregationStrategyClause<>(this);
+        setAggregationStrategy(clause);
+        return clause;
+    }
+
+    /**
+     * TODO: document
+     * Note: this is experimental and subject to changes in future releases.
+     *
+     * @return the builder
+     */
+    public AggregationStrategyClause<AggregateDefinition> strategy() {
+        return aggregationStrategy();
+    }
+
+    /**
      * Sets the aggregate strategy to use
      *
      * @param aggregationStrategy  the aggregate strategy to use
@@ -872,6 +896,28 @@ public class AggregateDefinition extends 
ProcessorDefinition<AggregateDefinition
     }
 
     /**
+     * TODO: document
+     * Note: this is experimental and subject to changes in future releases.
+     *
+     * @return the builder
+     */
+    public PredicateClause<AggregateDefinition> completionPredicate() {
+        PredicateClause<AggregateDefinition> clause = new 
PredicateClause<>(this);
+        completionPredicate(clause);
+        return clause;
+    }
+
+    /**
+     * TODO: document
+     * Note: this is experimental and subject to changes in future releases.
+     *
+     * @return the builder
+     */
+    public PredicateClause<AggregateDefinition> completion() {
+        return completionPredicate();
+    }
+
+    /**
      * Indicates to complete all current aggregated exchanges when the context 
is stopped
      */
     public AggregateDefinition forceCompletionOnStop() {

http://git-wip-us.apache.org/repos/asf/camel/blob/40fa7f86/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java 
b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
index 2ce1fc6..3c04f4b 100644
--- a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
@@ -46,6 +46,7 @@ import org.apache.camel.Predicate;
 import org.apache.camel.Processor;
 import org.apache.camel.Route;
 import org.apache.camel.builder.DataFormatClause;
+import org.apache.camel.builder.EnrichClause;
 import org.apache.camel.builder.ExpressionBuilder;
 import org.apache.camel.builder.ExpressionClause;
 import org.apache.camel.builder.ProcessClause;
@@ -3319,6 +3320,42 @@ public abstract class ProcessorDefinition<Type extends 
ProcessorDefinition<Type>
     }
 
     /**
+     * TODO: document
+     * Note: this is experimental and subject to changes in future releases.
+     *
+     * @return the builder
+     */
+    public EnrichClause<ProcessorDefinition<Type>> enrichWith(String 
resourceUri) {
+        EnrichClause<ProcessorDefinition<Type>> clause = new 
EnrichClause<>(this);
+        enrich(resourceUri, clause);
+        return clause;
+    }
+
+    /**
+     * TODO: document
+     * Note: this is experimental and subject to changes in future releases.
+     *
+     * @return the builder
+     */
+    public EnrichClause<ProcessorDefinition<Type>> enrichWith(String 
resourceUri, boolean aggregateOnException) {
+        EnrichClause<ProcessorDefinition<Type>> clause = new 
EnrichClause<>(this);
+        enrich(resourceUri, clause, aggregateOnException, false);
+        return clause;
+    }
+
+    /**
+     * TODO: document
+     * Note: this is experimental and subject to changes in future releases.
+     *
+     * @return the builder
+     */
+    public EnrichClause<ProcessorDefinition<Type>> enrichWith(String 
resourceUri, boolean aggregateOnException, boolean shareUnitOfWork) {
+        EnrichClause<ProcessorDefinition<Type>> clause = new 
EnrichClause<>(this);
+        enrich(resourceUri, clause, aggregateOnException, shareUnitOfWork);
+        return clause;
+    }
+
+    /**
      * The <a href="http://camel.apache.org/content-enricher.html";>Content 
Enricher EIP</a>
      * enriches an exchange with additional data obtained from a 
<code>resourceUri</code>.
      *
@@ -3525,6 +3562,43 @@ public abstract class ProcessorDefinition<Type extends 
ProcessorDefinition<Type>
         return pollEnrich(resourceUri, timeout, aggregationStrategyRef, false);
     }
 
+
+    /**
+     * TODO: document
+     * Note: this is experimental and subject to changes in future releases.
+     *
+     * @return the builder
+     */
+    public EnrichClause<ProcessorDefinition<Type>> pollEnrichWith(String 
resourceUri) {
+        EnrichClause<ProcessorDefinition<Type>> clause = new 
EnrichClause<>(this);
+        pollEnrich(resourceUri, -1, clause, false);
+        return clause;
+    }
+
+    /**
+     * TODO: document
+     * Note: this is experimental and subject to changes in future releases.
+     *
+     * @return the builder
+     */
+    public EnrichClause<ProcessorDefinition<Type>> pollEnrichWith(String 
resourceUri, long timeout) {
+        EnrichClause<ProcessorDefinition<Type>> clause = new 
EnrichClause<>(this);
+        pollEnrich(resourceUri, timeout, clause, false);
+        return clause;
+    }
+
+    /**
+     * TODO: document
+     * Note: this is experimental and subject to changes in future releases.
+     *
+     * @return the builder
+     */
+    public EnrichClause<ProcessorDefinition<Type>> pollEnrichWith(String 
resourceUri, long timeout, boolean aggregateOnException) {
+        EnrichClause<ProcessorDefinition<Type>> clause = new 
EnrichClause<>(this);
+        pollEnrich(resourceUri, timeout, clause, aggregateOnException);
+        return clause;
+    }
+
     /**
      * The <a href="http://camel.apache.org/content-enricher.html";>Content 
Enricher EIP</a>
      * enriches an exchange with additional data obtained from a 
<code>resourceUri</code>

http://git-wip-us.apache.org/repos/asf/camel/blob/40fa7f86/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDslTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDslTest.java
 
b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDslTest.java
new file mode 100644
index 0000000..55fd14e
--- /dev/null
+++ 
b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDslTest.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregator;
+
+import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+public class AggregateDslTest extends ContextTestSupport {
+
+    public void testAggregate() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:aggregated");
+        mock.expectedBodiesReceived("0,3,6", "1,4,7", "2,5,8");
+
+        for (int i = 0; i < 9; i++) {
+            template.sendBodyAndHeader("direct:start", i, "type", i % 3);
+        }
+
+        mock.assertIsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .aggregate()
+                        .message(m -> m.getHeader("type"))
+                        .strategy()
+                            .body(String.class, (o, n) ->  Stream.of(o, 
n).filter(Objects::nonNull).collect(Collectors.joining(",")))
+                        .completion()
+                            .body(String.class, s -> s.length() == 5)
+                                    .to("mock:aggregated");
+            }
+        };
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/camel/blob/40fa7f86/camel-core/src/test/java/org/apache/camel/processor/enricher/EnricherDslTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/enricher/EnricherDslTest.java
 
b/camel-core/src/test/java/org/apache/camel/processor/enricher/EnricherDslTest.java
new file mode 100644
index 0000000..ca7d8fd
--- /dev/null
+++ 
b/camel-core/src/test/java/org/apache/camel/processor/enricher/EnricherDslTest.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.enricher;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+public class EnricherDslTest extends ContextTestSupport {
+
+    public void testEnrich() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:enriched");
+        mock.expectedBodiesReceived("res-1", "res-2", "res-3");
+
+        template.sendBody("direct:start", "1");
+        template.sendBody("direct:start", "2");
+        template.sendBody("direct:start", "3");
+
+        mock.assertIsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .enrichWith("direct:resource")
+                        .body(String.class, (o, n) -> n + o)
+                    .to("mock:enriched");
+
+                // set an empty message
+                from("direct:resource")
+                    .transform()
+                        .body(b -> "res-");
+            }
+        };
+    }
+}

Reply via email to