Repository: camel Updated Branches: refs/heads/camel-2.17.x 484707506 -> 4d6da3b7b refs/heads/master d4ddff392 -> b3a819327
CAMEL-9863: Fixed bug in doWhileLoop when calling async component. Thanks to Sanigo for unit test. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/b3a81932 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/b3a81932 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/b3a81932 Branch: refs/heads/master Commit: b3a819327d5ed561a1c563aa9138765109cd382a Parents: d4ddff3 Author: Claus Ibsen <davscl...@apache.org> Authored: Wed Apr 13 11:05:45 2016 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Wed Apr 13 11:20:10 2016 +0200 ---------------------------------------------------------------------- .../apache/camel/processor/LoopProcessor.java | 71 ++++++++------------ .../async/AsyncEndpointDoWhileLoopTest.java | 54 +++++++++++++++ .../ahc/AhcProducePostDoWhileTest.java | 55 +++++++++++++++ 3 files changed, 137 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/b3a81932/camel-core/src/main/java/org/apache/camel/processor/LoopProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/LoopProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/LoopProcessor.java index 2fb244d..07bb255 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/LoopProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/LoopProcessor.java @@ -86,16 +86,18 @@ public class LoopProcessor extends DelegateAsyncProcessor implements Traceable, } // loop synchronously - while ((predicate != null && doWhile.get()) || (index.get() < count.get())) { + while ((predicate != null && doWhile.get()) || (index.get() < count.get())) { // and prepare for next iteration // if (!copy) target = exchange; else copy of original target = prepareExchange(exchange, index.get(), original); + // the following process method will in the done method re-evaluate the predicate + // so we do not need to do it here as well boolean sync = process(target, callback, index, count, doWhile, original); if (!sync) { LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", target.getExchangeId()); - // the remainder of the routing slip will be completed async + // the remainder of the loop will be completed async // so we break out now, then the callback will be invoked which then continue routing from where we left here return false; } @@ -106,21 +108,6 @@ public class LoopProcessor extends DelegateAsyncProcessor implements Traceable, if (!continueProcessing(target, "so breaking out of loop", LOG)) { break; } - - // increment counter before next loop - index.getAndIncrement(); - - // evaluate predicate - if (predicate != null) { - try { - boolean result = predicate.matches(exchange); - doWhile.set(result); - } catch (Exception e) { - // break out looping due that exception - exchange.setException(e); - doWhile.set(false); - } - } } // we are done so prepare the result @@ -137,21 +124,39 @@ public class LoopProcessor extends DelegateAsyncProcessor implements Traceable, // set current index as property LOG.debug("LoopProcessor: iteration #{}", index.get()); exchange.setProperty(Exchange.LOOP_INDEX, index.get()); - + boolean sync = processor.process(exchange, new AsyncCallback() { public void done(boolean doneSync) { - // we only have to handle async completion of the routing slip + // increment counter after done + index.getAndIncrement(); + + // evaluate predicate for next loop + if (predicate != null && index.get() > 0) { + try { + boolean result = predicate.matches(exchange); + doWhile.set(result); + } catch (Exception e) { + // break out looping due that exception + exchange.setException(e); + doWhile.set(false); + } + } + + // we only have to handle async completion of the loop + // (as the sync is done in the outer processor) if (doneSync) { return; } Exchange target = exchange; - // increment index as we have just processed once - index.getAndIncrement(); - // continue looping asynchronously - while ((predicate != null && doWhile.get()) || (index.get() < count.get())) { + while ((predicate != null && doWhile.get()) || (index.get() < count.get())) { + + // check for error if so we should break out + if (!continueProcessing(target, "so breaking out of loop", LOG)) { + break; + } // and prepare for next iteration target = prepareExchange(exchange, index.get(), original); @@ -164,26 +169,6 @@ public class LoopProcessor extends DelegateAsyncProcessor implements Traceable, // so we break out now, then the callback will be invoked which then continue routing from where we left here return; } - - // check for error if so we should break out - if (!continueProcessing(target, "so breaking out of loop", LOG)) { - break; - } - - // increment counter before next loop - index.getAndIncrement(); - - // evaluate predicate - if (predicate != null) { - try { - boolean result = predicate.matches(exchange); - doWhile.set(result); - } catch (Exception e) { - // break out looping due that exception - exchange.setException(e); - doWhile.set(false); - } - } } // we are done so prepare the result http://git-wip-us.apache.org/repos/asf/camel/blob/b3a81932/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointDoWhileLoopTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointDoWhileLoopTest.java b/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointDoWhileLoopTest.java new file mode 100644 index 0000000..a551727 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointDoWhileLoopTest.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.async; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; + +public class AsyncEndpointDoWhileLoopTest extends ContextTestSupport { + + public void testAsyncEndpoint() throws Exception { + getMockEndpoint("mock:line").expectedBodiesReceived("Bye Camel", "Bye Camel", "Bye Camel", "Bye Camel"); + getMockEndpoint("mock:result").expectedBodiesReceived("done"); + + template.requestBody("direct:start", "World", String.class); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + context.addComponent("async", new MyAsyncComponent()); + + from("direct:start") + .loopDoWhile(body().isNotEqualTo("done")) + .to("async:bye:camel") + .to("mock:line") + .filter(exchangeProperty(Exchange.LOOP_INDEX).isEqualTo(3)) + .setBody().constant("done") + .end() + .end() + .to("mock:result"); + } + }; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/b3a81932/components/camel-ahc/src/test/java/org/apache/camel/component/ahc/AhcProducePostDoWhileTest.java ---------------------------------------------------------------------- diff --git a/components/camel-ahc/src/test/java/org/apache/camel/component/ahc/AhcProducePostDoWhileTest.java b/components/camel-ahc/src/test/java/org/apache/camel/component/ahc/AhcProducePostDoWhileTest.java new file mode 100644 index 0000000..b7193a2 --- /dev/null +++ b/components/camel-ahc/src/test/java/org/apache/camel/component/ahc/AhcProducePostDoWhileTest.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.ahc; + +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.junit.Test; + +public class AhcProducePostDoWhileTest extends BaseAhcTest { + + @Test + public void testAhcDoWhile() throws Exception { + getMockEndpoint("mock:line").expectedBodiesReceived("Bye World", "Bye Bye World", "Bye Bye Bye World", "Bye Bye Bye Bye World"); + getMockEndpoint("mock:result").expectedBodiesReceived("done"); + + template.requestBody("direct:start", "World", String.class); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start").streamCaching() + .loopDoWhile(body().isNotEqualTo("done")) + .to(getAhcEndpointUri()) + .to("mock:line") + .filter(exchangeProperty(Exchange.LOOP_INDEX).isEqualTo(3)) + .setBody().constant("done") + .end() + .end() + .to("mock:result"); + + from(getTestServerEndpointUri()) + .transform(simple("Bye ${body}")); + } + }; + } +}