This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new 0303d2349e9 CAMEL-18713: Loop EIP - Negative pending size on shutdown if using loopWhile. Thanks to Petr Holubec for reporting. 0303d2349e9 is described below commit 0303d2349e98f6f5948621faa74e61c4aacaeeb3 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Sun Dec 4 13:17:27 2022 +0100 CAMEL-18713: Loop EIP - Negative pending size on shutdown if using loopWhile. Thanks to Petr Holubec for reporting. --- .../org/apache/camel/processor/LoopProcessor.java | 6 ++- .../camel/processor/LoopDoWhileTaskCountTest.java | 52 ++++++++++++++++++++++ 2 files changed, 57 insertions(+), 1 deletion(-) diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/LoopProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/LoopProcessor.java index 29f4a1aa924..f4709f8dfdc 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/LoopProcessor.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/LoopProcessor.java @@ -124,6 +124,7 @@ public class LoopProcessor extends DelegateAsyncProcessor implements Traceable, // but evaluation result is a textual representation of a numeric value. String text = expression.evaluate(exchange, String.class); count = ExchangeHelper.convertToMandatoryType(exchange, Integer.class, text); + // keep track of pending task if loop with fixed value taskCount.add(count); exchange.setProperty(ExchangePropertyKey.LOOP_SIZE, count); } @@ -150,7 +151,10 @@ public class LoopProcessor extends DelegateAsyncProcessor implements Traceable, processor.process(current, doneSync -> { // increment counter after done index++; - taskCount.decrement(); + if (expression != null) { + // keep track of pending task if loop with fixed value + taskCount.decrement(); + } reactiveExecutor.schedule(this); }); } else { diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/LoopDoWhileTaskCountTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/LoopDoWhileTaskCountTest.java new file mode 100644 index 00000000000..b2fd5e734e3 --- /dev/null +++ b/core/camel-core/src/test/java/org/apache/camel/processor/LoopDoWhileTaskCountTest.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class LoopDoWhileTaskCountTest extends ContextTestSupport { + + @Test + public void testLoopDoWhileSimple() throws Exception { + getMockEndpoint("mock:result").expectedBodiesReceived("AAAAAA"); + getMockEndpoint("mock:loop").expectedBodiesReceived("A", "AA", "AAA", "AAAA", "AAAAA"); + + template.sendBody("direct:simple", "A"); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:simple").loopDoWhile(simple("${body.length} <= 5")).id("myLoop") + .process(exchange -> { + LoopProcessor lp = exchange.getContext().getProcessor("myLoop", LoopProcessor.class); + int size = lp.getPendingExchangesSize(); + // task count should be zero as its while loop + Assertions.assertEquals(0, size); + }).to("mock:loop").transform(body().append("A")) + .end().to("mock:result"); + } + }; + } +}