This is an automated email from the ASF dual-hosted git repository. lburgazzoli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-k-runtime.git
commit 6361af15ab3ed0acfb4c92aa043daa57b079e872 Author: lburgazzoli <lburgazz...@gmail.com> AuthorDate: Sun Nov 24 11:20:04 2019 +0100 YAML: add support for Aggregate EIP --- .../k/loader/yaml/parser/AggregateStepParser.java | 64 ++++++++++++++++++ ...efinitionTest.groovy => DefinitionsTest.groovy} | 2 +- .../yaml/{RouteTest.groovy => RoutesTest.groovy} | 70 +++++++++----------- .../apache/camel/k/loader/yaml/TestSupport.groovy | 38 ++++++++++- .../k/loader/yaml/parser/AggregateTest.groovy | 75 ++++++++++++++++++++++ .../resources/routes/RoutesTest_aggregator.yaml | 25 ++++++++ .../test/resources/routes/RoutesTest_filter.yaml | 30 +++++++++ .../test/resources/routes/RoutesTest_split.yaml | 30 +++++++++ 8 files changed, 293 insertions(+), 41 deletions(-) diff --git a/camel-k-loader-yaml/src/main/java/org/apache/camel/k/loader/yaml/parser/AggregateStepParser.java b/camel-k-loader-yaml/src/main/java/org/apache/camel/k/loader/yaml/parser/AggregateStepParser.java new file mode 100644 index 0000000..52b192b --- /dev/null +++ b/camel-k-loader-yaml/src/main/java/org/apache/camel/k/loader/yaml/parser/AggregateStepParser.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.k.loader.yaml.parser; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import org.apache.camel.Expression; +import org.apache.camel.k.annotation.yaml.YAMLStepParser; +import org.apache.camel.k.loader.yaml.model.Step; +import org.apache.camel.model.AggregateDefinition; +import org.apache.camel.model.ExpressionSubElementDefinition; +import org.apache.camel.model.ProcessorDefinition; +import org.apache.camel.model.language.ExpressionDefinition; +import org.apache.camel.reifier.AggregateReifier; +import org.apache.camel.reifier.ProcessorReifier; + +@YAMLStepParser("aggregate") +public class AggregateStepParser implements ProcessorStepParser { + static { + ProcessorReifier.registerReifier(Definition.class, AggregateReifier::new); + } + + @Override + public ProcessorDefinition<?> toProcessor(Context context) { + return context.node(Definition.class); + } + + public static final class Definition extends AggregateDefinition implements HasExpression, Step.Definition { + @JsonIgnore + public void setExpression(Expression expression) { + super.setExpression(expression); + } + + public void setCorrelationExpression(CorrelationExpression correlationExpression) { + super.setCorrelationExpression(correlationExpression); + } + } + + public static final class CorrelationExpression extends ExpressionSubElementDefinition implements HasExpression { + @Override + public void setExpression(ExpressionDefinition expressionDefinition) { + super.setExpressionType(expressionDefinition); + } + + @Override + public ExpressionDefinition getExpression() { + return super.getExpressionType(); + } + } +} + diff --git a/camel-k-loader-yaml/src/test/groovy/org/apache/camel/k/loader/yaml/RouteDefinitionTest.groovy b/camel-k-loader-yaml/src/test/groovy/org/apache/camel/k/loader/yaml/DefinitionsTest.groovy similarity index 99% rename from camel-k-loader-yaml/src/test/groovy/org/apache/camel/k/loader/yaml/RouteDefinitionTest.groovy rename to camel-k-loader-yaml/src/test/groovy/org/apache/camel/k/loader/yaml/DefinitionsTest.groovy index b63d4ba..0fe243d 100644 --- a/camel-k-loader-yaml/src/test/groovy/org/apache/camel/k/loader/yaml/RouteDefinitionTest.groovy +++ b/camel-k-loader-yaml/src/test/groovy/org/apache/camel/k/loader/yaml/DefinitionsTest.groovy @@ -27,7 +27,7 @@ import org.apache.commons.io.IOUtils import java.nio.charset.StandardCharsets -class RouteDefinitionTest extends TestSupport { +class DefinitionsTest extends TestSupport { def "route with id"() { given: diff --git a/camel-k-loader-yaml/src/test/groovy/org/apache/camel/k/loader/yaml/RouteTest.groovy b/camel-k-loader-yaml/src/test/groovy/org/apache/camel/k/loader/yaml/RoutesTest.groovy similarity index 63% rename from camel-k-loader-yaml/src/test/groovy/org/apache/camel/k/loader/yaml/RouteTest.groovy rename to camel-k-loader-yaml/src/test/groovy/org/apache/camel/k/loader/yaml/RoutesTest.groovy index 72dd75a..23fd49b 100644 --- a/camel-k-loader-yaml/src/test/groovy/org/apache/camel/k/loader/yaml/RouteTest.groovy +++ b/camel-k-loader-yaml/src/test/groovy/org/apache/camel/k/loader/yaml/RoutesTest.groovy @@ -18,27 +18,13 @@ package org.apache.camel.k.loader.yaml import org.apache.camel.component.mock.MockEndpoint +import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy -class RouteTest extends TestSupport { +class RoutesTest extends TestSupport { - def 'test split'() { + def 'split'() { setup: - def context = startContext(''' - - from: - uri: "direct:route" - steps: - - split: - tokenize: "," - steps: - - to: "mock:split" - - to: "mock:route" - - from: - uri: "direct:flow" - steps: - - split: - tokenize: "," - - to: "mock:flow" - ''') + def context = startContext() mockEndpoint(context,'mock:split') { expectedMessageCount = 3 @@ -63,35 +49,20 @@ class RouteTest extends TestSupport { context?.stop() } - def 'test filter'() { + def 'filter'() { setup: - def context = startContext(''' - - from: - uri: "direct:route" - steps: - - filter: - simple: "${body.startsWith(\\"a\\")}" - steps: - - to: "mock:filter" - - to: "mock:route" - - from: - uri: "direct:flow" - steps: - - filter: - simple: "${body.startsWith(\\"a\\")}" - - to: "mock:flow" - ''') + def context = startContext() mockEndpoint(context, 'mock:route') { - expectedMessageCount = 2 + expectedMessageCount 2 expectedBodiesReceived 'a', 'b' } mockEndpoint(context, 'mock:filter') { - expectedMessageCount = 1 + expectedMessageCount 1 expectedBodiesReceived 'a' } mockEndpoint(context,'mock:flow') { - expectedMessageCount = 1 + expectedMessageCount 1 expectedBodiesReceived 'a' } when: @@ -106,4 +77,27 @@ class RouteTest extends TestSupport { cleanup: context?.stop() } + + def 'aggregator'() { + setup: + def context = startContext([ + 'aggregatorStrategy': new UseLatestAggregationStrategy() + ]) + + mockEndpoint(context, 'mock:route') { + expectedMessageCount 2 + expectedBodiesReceived '2', '4' + } + when: + context.createProducerTemplate().with { + sendBodyAndHeader('direct:route', '1', 'StockSymbol', 1) + sendBodyAndHeader('direct:route', '2', 'StockSymbol', 1) + sendBodyAndHeader('direct:route', '3', 'StockSymbol', 2) + sendBodyAndHeader('direct:route', '4', 'StockSymbol', 2) + } + then: + MockEndpoint.assertIsSatisfied(context) + cleanup: + context?.stop() + } } diff --git a/camel-k-loader-yaml/src/test/groovy/org/apache/camel/k/loader/yaml/TestSupport.groovy b/camel-k-loader-yaml/src/test/groovy/org/apache/camel/k/loader/yaml/TestSupport.groovy index 400c59f..fea7a41 100644 --- a/camel-k-loader-yaml/src/test/groovy/org/apache/camel/k/loader/yaml/TestSupport.groovy +++ b/camel-k-loader-yaml/src/test/groovy/org/apache/camel/k/loader/yaml/TestSupport.groovy @@ -43,9 +43,29 @@ class TestSupport extends Specification { } static CamelContext startContext(String content) { + return startContext(content, [:]) + } + + static CamelContext startContext(String content, Map<String, Object> beans) { + return startContext( + IOUtils.toInputStream(content.stripMargin(), StandardCharsets.UTF_8), + beans + ) + } + + static CamelContext startContext(InputStream content) { + return startContext(content, [:]) + } + + static CamelContext startContext(InputStream content, Map<String, Object> beans) { def context = new DefaultCamelContext() - def istream = IOUtils.toInputStream(content.stripMargin(), StandardCharsets.UTF_8) - def builder = new YamlSourceLoader().builder(istream) + def builder = new YamlSourceLoader().builder(content) + + if (beans) { + beans.each { + k, v -> context.registry.bind(k, v) + } + } context.disableJMX() context.setStreamCaching(true) @@ -55,6 +75,20 @@ class TestSupport extends Specification { return context } + CamelContext startContext() { + return startContext([:]) + } + + CamelContext startContext(Map<String, Object> beans) { + def name = specificationContext.currentIteration.name.replace(' ', '_') + def path = "/routes/${specificationContext.currentSpec.name}_${name}.yaml" + + return startContext( + TestSupport.class.getResourceAsStream(path) as InputStream, + beans + ) + } + static MockEndpoint mockEndpoint( CamelContext context, String uri, diff --git a/camel-k-loader-yaml/src/test/groovy/org/apache/camel/k/loader/yaml/parser/AggregateTest.groovy b/camel-k-loader-yaml/src/test/groovy/org/apache/camel/k/loader/yaml/parser/AggregateTest.groovy new file mode 100644 index 0000000..8335dcf --- /dev/null +++ b/camel-k-loader-yaml/src/test/groovy/org/apache/camel/k/loader/yaml/parser/AggregateTest.groovy @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.k.loader.yaml.parser + +import org.apache.camel.k.loader.yaml.TestSupport +import org.apache.camel.model.AggregateDefinition +import org.apache.camel.model.language.SimpleExpression + +class AggregateTest extends TestSupport { + + def "definition"() { + given: + def stepContext = stepContext(''' + expression: + simple: "${header.ID}" + correlation-expression: + simple: "${header.Count}" + strategy-ref: "myAppender" + completion-size: 10 + ''') + when: + def processor = new AggregateStepParser().toProcessor(stepContext) + then: + with(processor, AggregateDefinition) { + strategyRef == 'myAppender' + completionSize == 10 + + with(expression, SimpleExpression) { + expression == '${header.ID}' + } + with(correlationExpression?.expressionType, SimpleExpression) { + expression == '${header.Count}' + } + } + } + + def "compact efinition"() { + given: + def stepContext = stepContext(''' + simple: "${header.ID}" + correlation-expression: + simple: "${header.Count}" + strategy-ref: "myAppender" + completion-size: 10 + ''') + when: + def processor = new AggregateStepParser().toProcessor(stepContext) + then: + with(processor, AggregateDefinition) { + strategyRef == 'myAppender' + completionSize == 10 + + with(expression, SimpleExpression) { + expression == '${header.ID}' + } + with(correlationExpression?.expressionType, SimpleExpression) { + expression == '${header.Count}' + } + } + } +} diff --git a/camel-k-loader-yaml/src/test/resources/routes/RoutesTest_aggregator.yaml b/camel-k-loader-yaml/src/test/resources/routes/RoutesTest_aggregator.yaml new file mode 100644 index 0000000..1634826 --- /dev/null +++ b/camel-k-loader-yaml/src/test/resources/routes/RoutesTest_aggregator.yaml @@ -0,0 +1,25 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +- from: + uri: "direct:route" + steps: + - aggregate: + strategy-ref: "aggregatorStrategy" + completion-size: 2 + correlation-expression: + simple: "${header.StockSymbol}" + - to: "mock:route" \ No newline at end of file diff --git a/camel-k-loader-yaml/src/test/resources/routes/RoutesTest_filter.yaml b/camel-k-loader-yaml/src/test/resources/routes/RoutesTest_filter.yaml new file mode 100644 index 0000000..e34b1ff --- /dev/null +++ b/camel-k-loader-yaml/src/test/resources/routes/RoutesTest_filter.yaml @@ -0,0 +1,30 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +- from: + uri: "direct:route" + steps: + - filter: + simple: "${body.startsWith(\"a\")}" + steps: + - to: "mock:filter" + - to: "mock:route" +- from: + uri: "direct:flow" + steps: + - filter: + simple: "${body.startsWith(\"a\")}" + - to: "mock:flow" \ No newline at end of file diff --git a/camel-k-loader-yaml/src/test/resources/routes/RoutesTest_split.yaml b/camel-k-loader-yaml/src/test/resources/routes/RoutesTest_split.yaml new file mode 100644 index 0000000..5011482 --- /dev/null +++ b/camel-k-loader-yaml/src/test/resources/routes/RoutesTest_split.yaml @@ -0,0 +1,30 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +- from: + uri: "direct:route" + steps: + - split: + tokenize: "," + steps: + - to: "mock:split" + - to: "mock:route" +- from: + uri: "direct:flow" + steps: + - split: + tokenize: "," + - to: "mock:flow" \ No newline at end of file