This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new f5d3a0d CAMEL-15578: Loop EIP - Add option to break out if shutting down Camel (#4811) f5d3a0d is described below commit f5d3a0de679be01cd3120c1ee17cbf1d5284f9e4 Author: mschnitzler <schnitzler.michael+git...@gmail.com> AuthorDate: Tue Dec 22 06:15:31 2020 +0100 CAMEL-15578: Loop EIP - Add option to break out if shutting down Camel (#4811) * CAMEL-15578: add option "breakOnShutdown" to Loop EIP Setting the "breakOnShutdown" option on the Loop EIP allows to break out of the loop earlier if the context is shutting down. * CAMEL-15578: add missing license header * CAMEL-15578: add option "breakOnShutdown" to Loop EIP --- .../org/apache/camel/catalog/models/loop.json | 1 + .../apache/camel/catalog/schemas/camel-spring.xsd | 8 +++ .../resources/org/apache/camel/model/loop.json | 1 + .../org/apache/camel/model/LoopDefinition.java | 22 +++++++ .../org/apache/camel/processor/LoopProcessor.java | 34 ++++++++-- .../java/org/apache/camel/reifier/LoopReifier.java | 3 +- .../camel/processor/LoopBreakOnShutdownTest.java | 75 ++++++++++++++++++++++ .../camel/processor/LoopNoBreakOnShutdownTest.java | 72 +++++++++++++++++++++ .../java/org/apache/camel/xml/in/ModelParser.java | 1 + 9 files changed, 212 insertions(+), 5 deletions(-) diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/loop.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/loop.json index 9f23152..82adb12 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/loop.json +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/loop.json @@ -14,6 +14,7 @@ "expression": { "kind": "expression", "displayName": "Expression", "required": true, "type": "object", "javaType": "org.apache.camel.model.language.ExpressionDefinition", "oneOf": [ "constant", "csimple", "datasonnet", "exchangeProperty", "groovy", "header", "hl7terser", "joor", "jsonpath", "language", "method", "mvel", "ognl", "ref", "simple", "spel", "tokenize", "xpath", "xquery", "xtokenize" ], "deprecated": false, "autowired": false, "secret": false, "description": "Expression to [...] "copy": { "kind": "attribute", "displayName": "Copy", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "If the copy attribute is true, a copy of the input Exchange is used for each iteration. That means each iteration will start from a copy of the same message. By default loop will loop the same exchange all over, so each iteration may have different message content." }, "doWhile": { "kind": "attribute", "displayName": "Do While", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Enables the while loop that loops until the predicate evaluates to false or null." }, + "breakOnShutdown": { "kind": "attribute", "displayName": "Break On Shutdown", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "If the breakOnShutdown attribute is true, then the loop will not iterate until it reaches the end when Camel is shut down." }, "id": { "kind": "attribute", "displayName": "Id", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the id of this node" }, "description": { "kind": "element", "displayName": "Description", "required": false, "type": "object", "javaType": "org.apache.camel.model.DescriptionDefinition", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the description of this node" } } diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd index 0c5d4e3..0530ef7 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd @@ -5586,6 +5586,14 @@ null. Default value: false ]]></xs:documentation> </xs:annotation> </xs:attribute> + <xs:attribute name="breakOnShutdown" type="xs:string"> + <xs:annotation> + <xs:documentation xml:lang="en"><![CDATA[ +If the breakOnShutdown attribute is true, then the loop will not iterate until +it reaches the end when Camel is shut down. Default value: false + ]]></xs:documentation> + </xs:annotation> + </xs:attribute> </xs:extension> </xs:complexContent> </xs:complexType> diff --git a/core/camel-core-model/src/generated/resources/org/apache/camel/model/loop.json b/core/camel-core-model/src/generated/resources/org/apache/camel/model/loop.json index 9f23152..82adb12 100644 --- a/core/camel-core-model/src/generated/resources/org/apache/camel/model/loop.json +++ b/core/camel-core-model/src/generated/resources/org/apache/camel/model/loop.json @@ -14,6 +14,7 @@ "expression": { "kind": "expression", "displayName": "Expression", "required": true, "type": "object", "javaType": "org.apache.camel.model.language.ExpressionDefinition", "oneOf": [ "constant", "csimple", "datasonnet", "exchangeProperty", "groovy", "header", "hl7terser", "joor", "jsonpath", "language", "method", "mvel", "ognl", "ref", "simple", "spel", "tokenize", "xpath", "xquery", "xtokenize" ], "deprecated": false, "autowired": false, "secret": false, "description": "Expression to [...] "copy": { "kind": "attribute", "displayName": "Copy", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "If the copy attribute is true, a copy of the input Exchange is used for each iteration. That means each iteration will start from a copy of the same message. By default loop will loop the same exchange all over, so each iteration may have different message content." }, "doWhile": { "kind": "attribute", "displayName": "Do While", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Enables the while loop that loops until the predicate evaluates to false or null." }, + "breakOnShutdown": { "kind": "attribute", "displayName": "Break On Shutdown", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "If the breakOnShutdown attribute is true, then the loop will not iterate until it reaches the end when Camel is shut down." }, "id": { "kind": "attribute", "displayName": "Id", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the id of this node" }, "description": { "kind": "element", "displayName": "Description", "required": false, "type": "object", "javaType": "org.apache.camel.model.DescriptionDefinition", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the description of this node" } } diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/LoopDefinition.java b/core/camel-core-model/src/main/java/org/apache/camel/model/LoopDefinition.java index 7ec080d..02e62c5 100644 --- a/core/camel-core-model/src/main/java/org/apache/camel/model/LoopDefinition.java +++ b/core/camel-core-model/src/main/java/org/apache/camel/model/LoopDefinition.java @@ -40,6 +40,9 @@ public class LoopDefinition extends OutputExpressionNode { @XmlAttribute @Metadata(javaType = "java.lang.Boolean") private String doWhile; + @XmlAttribute + @Metadata(javaType = "java.lang.Boolean") + private String breakOnShutdown; public LoopDefinition() { } @@ -92,6 +95,25 @@ public class LoopDefinition extends OutputExpressionNode { this.copy = copy; } + public LoopDefinition breakOnShutdown() { + setBreakOnShutdown(Boolean.toString(true)); + return this; + } + + /** + * If the breakOnShutdown attribute is true, then the loop will not iterate until it reaches the end when Camel is + * shut down. + * + * @param breakOnShutdown a Boolean-parsable String + */ + public void setBreakOnShutdown(String breakOnShutdown) { + this.breakOnShutdown = breakOnShutdown; + } + + public String getBreakOnShutdown() { + return breakOnShutdown; + } + @Override public String toString() { return "Loop[" + getExpression() + " -> " + getOutputs() + "]"; diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/LoopProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/LoopProcessor.java index 834dc8a..1a5160d 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/LoopProcessor.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/LoopProcessor.java @@ -24,10 +24,12 @@ import org.apache.camel.ExtendedCamelContext; import org.apache.camel.NoTypeConversionAvailableException; import org.apache.camel.Predicate; import org.apache.camel.Processor; +import org.apache.camel.ShutdownRunningTask; import org.apache.camel.Traceable; import org.apache.camel.spi.IdAware; import org.apache.camel.spi.ReactiveExecutor; import org.apache.camel.spi.RouteIdAware; +import org.apache.camel.spi.ShutdownAware; import org.apache.camel.support.ExchangeHelper; import org.apache.camel.support.processor.DelegateAsyncProcessor; import org.slf4j.Logger; @@ -38,32 +40,36 @@ import static org.apache.camel.processor.PipelineHelper.continueProcessing; /** * The processor which sends messages in a loop. */ -public class LoopProcessor extends DelegateAsyncProcessor implements Traceable, IdAware, RouteIdAware { +public class LoopProcessor extends DelegateAsyncProcessor implements Traceable, IdAware, RouteIdAware, ShutdownAware { private static final Logger LOG = LoggerFactory.getLogger(LoopProcessor.class); private String id; private String routeId; + private LoopState state; + private boolean shutdownPending; private final CamelContext camelContext; private final ReactiveExecutor reactiveExecutor; private final Expression expression; private final Predicate predicate; private final boolean copy; + private final boolean breakOnShutdown; public LoopProcessor(CamelContext camelContext, Processor processor, Expression expression, Predicate predicate, - boolean copy) { + boolean copy, boolean breakOnShutdown) { super(processor); this.camelContext = camelContext; this.reactiveExecutor = camelContext.adapt(ExtendedCamelContext.class).getReactiveExecutor(); this.expression = expression; this.predicate = predicate; this.copy = copy; + this.breakOnShutdown = breakOnShutdown; } @Override public boolean process(Exchange exchange, AsyncCallback callback) { try { - LoopState state = new LoopState(exchange, callback); + state = new LoopState(exchange, callback); if (exchange.isTransacted()) { reactiveExecutor.scheduleSync(state); @@ -78,6 +84,21 @@ public class LoopProcessor extends DelegateAsyncProcessor implements Traceable, } } + @Override + public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) { + return !breakOnShutdown; + } + + @Override + public int getPendingExchangesSize() { + return state.getPendingSize(); + } + + @Override + public void prepareShutdown(boolean suspendOnly, boolean forced) { + shutdownPending = true; + } + /** * Class holding state for loop processing */ @@ -111,9 +132,10 @@ public class LoopProcessor extends DelegateAsyncProcessor implements Traceable, boolean cont = continueProcessing(current, "so breaking out of loop", LOG); boolean doWhile = predicate == null || predicate.matches(current); boolean doLoop = expression == null || index < count; + boolean isStopping = shutdownPending && breakOnShutdown; // iterate - if (cont && doWhile && doLoop) { + if (cont && doWhile && doLoop && !isStopping) { // and prepare for next iteration current = prepareExchange(exchange, index); @@ -143,6 +165,10 @@ public class LoopProcessor extends DelegateAsyncProcessor implements Traceable, } } + public int getPendingSize() { + return Math.max(count - index, 0); + } + @Override public String toString() { return "LoopState"; diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/LoopReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/LoopReifier.java index 219d658..bcdc731 100644 --- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/LoopReifier.java +++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/LoopReifier.java @@ -35,6 +35,7 @@ public class LoopReifier extends ExpressionReifier<LoopDefinition> { Processor output = this.createChildProcessor(true); boolean isCopy = parseBoolean(definition.getCopy(), false); boolean isWhile = parseBoolean(definition.getDoWhile(), false); + boolean isBreakOnShutdown = parseBoolean(definition.getBreakOnShutdown(), false); Predicate predicate = null; Expression expression = null; @@ -43,7 +44,7 @@ public class LoopReifier extends ExpressionReifier<LoopDefinition> { } else { expression = createExpression(definition.getExpression()); } - return new LoopProcessor(camelContext, output, expression, predicate, isCopy); + return new LoopProcessor(camelContext, output, expression, predicate, isCopy, isBreakOnShutdown); } } diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/LoopBreakOnShutdownTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/LoopBreakOnShutdownTest.java new file mode 100644 index 0000000..5127a7d --- /dev/null +++ b/core/camel-core/src/test/java/org/apache/camel/processor/LoopBreakOnShutdownTest.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import java.util.concurrent.CompletableFuture; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.ShutdownRoute; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.awaitility.Awaitility.await; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThan; + +class LoopBreakOnShutdownTest extends ContextTestSupport { + + private static final int LOOP_COUNT = 100; + + @Test + void testLoopBreakOnShutdown() { + MockEndpoint mock = getMockEndpoint("mock:result"); + + CompletableFuture<Object> future = template.asyncSendBody("seda:foo", 0); + await().atMost(1, SECONDS).until(future::isDone); + + context.stop(); + + int received = mock.getReceivedCounter(); + assertThat(received, is(lessThan(LOOP_COUNT))); + } + + @Override + @BeforeEach + public void setUp() throws Exception { + super.setUp(); + } + + @Override + protected RouteBuilder createRouteBuilder() { + + return new RouteBuilder() { + public void configure() { + + from("seda:foo") + .startupOrder(1) + .loop(LOOP_COUNT).breakOnShutdown().delay(50) + .to("seda:bar"); + + from("seda:bar") + .startupOrder(2) + .shutdownRoute(ShutdownRoute.Defer) + .to("mock:result"); + } + }; + } +} diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/LoopNoBreakOnShutdownTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/LoopNoBreakOnShutdownTest.java new file mode 100644 index 0000000..90ed28b --- /dev/null +++ b/core/camel-core/src/test/java/org/apache/camel/processor/LoopNoBreakOnShutdownTest.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import java.util.concurrent.CompletableFuture; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.ShutdownRoute; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.awaitility.Awaitility.await; + +class LoopNoBreakOnShutdownTest extends ContextTestSupport { + + private static final int LOOP_COUNT = 100; + + @Test + void testLoopNoBreakOnShutdown() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMinimumMessageCount(LOOP_COUNT); + + CompletableFuture<Object> future = template.asyncSendBody("seda:foo", "foo"); + await().atMost(1, SECONDS).until(future::isDone); + + context.stop(); + + mock.assertIsSatisfied(); + } + + @Override + @BeforeEach + public void setUp() throws Exception { + super.setUp(); + } + + @Override + protected RouteBuilder createRouteBuilder() { + + return new RouteBuilder() { + public void configure() { + + from("seda:foo") + .startupOrder(1) + .loop(LOOP_COUNT).delay(50) + .to("seda:bar"); + + from("seda:bar") + .startupOrder(2) + .shutdownRoute(ShutdownRoute.Defer) + .to("mock:result"); + } + }; + } +} diff --git a/core/camel-xml-io/src/generated/java/org/apache/camel/xml/in/ModelParser.java b/core/camel-xml-io/src/generated/java/org/apache/camel/xml/in/ModelParser.java index fca47d5..7424b51 100644 --- a/core/camel-xml-io/src/generated/java/org/apache/camel/xml/in/ModelParser.java +++ b/core/camel-xml-io/src/generated/java/org/apache/camel/xml/in/ModelParser.java @@ -573,6 +573,7 @@ public class ModelParser extends BaseParser { protected LoopDefinition doParseLoopDefinition() throws IOException, XmlPullParserException { return doParse(new LoopDefinition(), (def, key, val) -> { switch (key) { + case "breakOnShutdown": def.setBreakOnShutdown(val); break; case "copy": def.setCopy(val); break; case "doWhile": def.setDoWhile(val); break; default: return processorDefinitionAttributeHandler().accept(def, key, val);