This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch camel-2.21.x in repository https://gitbox.apache.org/repos/asf/camel.git
commit 20c3d9fb3e94bb59e68fba9d4ec3dedcf1ba74d0 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Wed Sep 5 10:15:12 2018 +0200 CAMEL-12775: Fixed stub component with MEP InOut vs InOnly mode would expect a reply message or not. Otherwise InOut with no active consumer would block and timeout after 30 seconds and fail. Thanks to Guido Schreuder for reporting and the unit test. --- .../apache/camel/component/stub/StubEndpoint.java | 9 ++- .../apache/camel/component/stub/StubProducer.java | 59 ++++++++++++++++++ .../camel/component/stub/StubConsumerTest.java | 70 ++++++++++++++++++++++ .../apache/camel/component/stub/StubInOutTest.java | 53 ++++++++++++++++ .../org/apache/camel/component/stub/StubTest.java | 63 +++++++++++++++++++ 5 files changed, 253 insertions(+), 1 deletion(-) diff --git a/camel-core/src/main/java/org/apache/camel/component/stub/StubEndpoint.java b/camel-core/src/main/java/org/apache/camel/component/stub/StubEndpoint.java index 69a6325..25b0e67 100644 --- a/camel-core/src/main/java/org/apache/camel/component/stub/StubEndpoint.java +++ b/camel-core/src/main/java/org/apache/camel/component/stub/StubEndpoint.java @@ -21,7 +21,10 @@ import java.util.concurrent.BlockingQueue; import org.apache.camel.Component; import org.apache.camel.Exchange; import org.apache.camel.Processor; +import org.apache.camel.Producer; +import org.apache.camel.WaitForTaskToComplete; import org.apache.camel.component.seda.BlockingQueueFactory; +import org.apache.camel.component.seda.SedaProducer; import org.apache.camel.component.vm.VmConsumer; import org.apache.camel.component.vm.VmEndpoint; import org.apache.camel.spi.UriEndpoint; @@ -29,7 +32,7 @@ import org.apache.camel.spi.UriEndpoint; /** * The stub component provides a simple way to stub out any physical endpoints while in development or testing. * - * For example to run a route without needing to actually connect to a specific SMTP or Http endpoint. + * For example to run a route without needing to actually connect to a specific SMTP or HTTP endpoint. * Just add stub: in front of any endpoint URI to stub out the endpoint. * Internally the Stub component creates VM endpoints. The main difference between Stub and VM is that VM * will validate the URI and parameters you give it, so putting vm: in front of a typical URI with @@ -56,4 +59,8 @@ public class StubEndpoint extends VmEndpoint { return new StubConsumer(this, processor); } + @Override + public Producer createProducer() throws Exception { + return new StubProducer(this, getWaitForTaskToComplete(), getTimeout(), isBlockWhenFull(), getOfferTimeout()); + } } diff --git a/camel-core/src/main/java/org/apache/camel/component/stub/StubProducer.java b/camel-core/src/main/java/org/apache/camel/component/stub/StubProducer.java new file mode 100644 index 0000000..582aeb8 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/component/stub/StubProducer.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.stub; + +import org.apache.camel.AsyncCallback; +import org.apache.camel.Exchange; +import org.apache.camel.ExchangePattern; +import org.apache.camel.WaitForTaskToComplete; +import org.apache.camel.component.seda.QueueReference; +import org.apache.camel.component.seda.SedaEndpoint; +import org.apache.camel.component.seda.SedaProducer; + +public class StubProducer extends SedaProducer { + + public StubProducer(SedaEndpoint endpoint, WaitForTaskToComplete waitForTaskToComplete, long timeout, boolean blockWhenFull, long offerTimeout) { + super(endpoint, waitForTaskToComplete, timeout, blockWhenFull, offerTimeout); + } + + @Override + public StubEndpoint getEndpoint() { + return (StubEndpoint) super.getEndpoint(); + } + + @Override + public boolean process(Exchange exchange, AsyncCallback callback) { + AsyncCallback cb = callback; + + QueueReference queueReference = getEndpoint().getQueueReference(); + boolean empty = queueReference == null || !queueReference.hasConsumers(); + + // if no consumers then use InOnly mode + final ExchangePattern pattern = exchange.getPattern(); + if (empty && pattern != ExchangePattern.InOnly) { + exchange.setPattern(ExchangePattern.InOnly); + cb = doneSync -> { + // and restore the old pattern after processing + exchange.setPattern(pattern); + callback.done(doneSync); + }; + } + + return super.process(exchange, cb); + } + +} diff --git a/camel-core/src/test/java/org/apache/camel/component/stub/StubConsumerTest.java b/camel-core/src/test/java/org/apache/camel/component/stub/StubConsumerTest.java new file mode 100644 index 0000000..0af78cd --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/component/stub/StubConsumerTest.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.stub; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.ExchangePattern; +import org.apache.camel.builder.RouteBuilder; +import org.junit.Test; + +public class StubConsumerTest extends ContextTestSupport { + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:InOnly") + .setExchangePattern(ExchangePattern.InOnly) + .to("stub:foo") + .to("mock:result"); + + from("direct:InOut") + .setExchangePattern(ExchangePattern.InOut) + .to("stub:foo") + .to("mock:result"); + + from("stub:foo") + .transform().constant("Bye World"); + } + }; + } + + final void test(ExchangePattern mep) throws InterruptedException { + if (mep == ExchangePattern.InOut) { + getMockEndpoint("mock:result").expectedBodiesReceived("Bye World"); + } else { + getMockEndpoint("mock:result").expectedBodiesReceived("Hello World"); + } + getMockEndpoint("mock:result").setExpectedMessageCount(1); + + template.sendBody("direct:" + mep.name(), "Hello World"); + + assertMockEndpointsSatisfied(); + } + + @Test + public void testInOnly() throws InterruptedException { + test(ExchangePattern.InOnly); + } + + @Test + public void testInOut() throws InterruptedException { + test(ExchangePattern.InOut); + } + +} diff --git a/camel-core/src/test/java/org/apache/camel/component/stub/StubInOutTest.java b/camel-core/src/test/java/org/apache/camel/component/stub/StubInOutTest.java new file mode 100644 index 0000000..e99c22b --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/component/stub/StubInOutTest.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.stub; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.vm.AbstractVmTestSupport; +import org.junit.Test; + +public class StubInOutTest extends AbstractVmTestSupport { + + @Test + public void testInOut() throws Exception { + getMockEndpoint("mock:result").expectedBodiesReceived("Hello World"); + + template2.requestBody("direct:start", "Hello World"); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("stub:smtp://some.server.com?something=bar&whatnot=cheese").to("mock:result"); + } + }; + } + + @Override + protected RouteBuilder createRouteBuilderForSecondContext() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start").to("stub:smtp://some.server.com?something=bar&whatnot=cheese"); + } + }; + } +} \ No newline at end of file diff --git a/camel-core/src/test/java/org/apache/camel/component/stub/StubTest.java b/camel-core/src/test/java/org/apache/camel/component/stub/StubTest.java new file mode 100644 index 0000000..bd96ec5 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/component/stub/StubTest.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.stub; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.ExchangePattern; +import org.apache.camel.builder.RouteBuilder; +import org.junit.Test; + +public class StubTest extends ContextTestSupport { + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:InOnly") + .setExchangePattern(ExchangePattern.InOnly) + .to("stub:foo") + .to("mock:result"); + + from("direct:InOut") + .setExchangePattern(ExchangePattern.InOut) + .to("stub:foo") + .to("mock:result"); + } + }; + } + + final void test(ExchangePattern mep) throws InterruptedException { + getMockEndpoint("mock:result").expectedBodiesReceived("Hello World"); + getMockEndpoint("mock:result").setExpectedMessageCount(1); + + template.sendBody("direct:" + mep.name(), "Hello World"); + + assertMockEndpointsSatisfied(); + } + + @Test + public void testInOnly() throws InterruptedException { + test(ExchangePattern.InOnly); + } + + @Test + public void testInOut() throws InterruptedException { + test(ExchangePattern.InOut); + } + +}