Updated Branches: refs/heads/camel-2.11.x 6f10a76bc -> 6c38b9036 refs/heads/master e14c66357 -> 4684225fd
CAMEL-6151: Added support for blocking direct producers. Thanks to Aaron Whiteside for partial patch. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/4684225f Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/4684225f Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/4684225f Branch: refs/heads/master Commit: 4684225fd15b04021c28605e7d37c1ef89b08d5f Parents: e14c663 Author: Claus Ibsen <davscl...@apache.org> Authored: Mon Jun 3 17:48:53 2013 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Mon Jun 3 17:48:53 2013 +0200 ---------------------------------------------------------------------- .../component/direct/DirectBlockingProducer.java | 97 +++++++++++++++ .../camel/component/direct/DirectConsumer.java | 5 + .../camel/component/direct/DirectEndpoint.java | 41 ++++++- .../direct/DirectProducerBlockingTest.java | 96 ++++++++++++++ .../camel/impl/CustomExchangePatternTest.java | 96 -------------- 5 files changed, 234 insertions(+), 101 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/4684225f/camel-core/src/main/java/org/apache/camel/component/direct/DirectBlockingProducer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/direct/DirectBlockingProducer.java b/camel-core/src/main/java/org/apache/camel/component/direct/DirectBlockingProducer.java new file mode 100644 index 0000000..1b28bed --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/component/direct/DirectBlockingProducer.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.direct; + +import org.apache.camel.AsyncCallback; +import org.apache.camel.Exchange; +import org.apache.camel.impl.DefaultAsyncProducer; +import org.apache.camel.util.StopWatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The direct producer. + * <p/> + * If blocking is enabled ({@code DirectEndpoint#isBlock}) then the DirectEndpoint will create an instance + * of this class instead of {@code DirectProducer}. + * This producers {@code process} method will block for the configured duration ({@code DirectEndpoint#getTimeout}, + * default to 30 seconds). After which if a consumer is still unavailable a DirectConsumerNotAvailableException + * will be thrown. + * <p/> + * Implementation note: Concurrent Producers will block for the duration it takes to determine if a + * consumer is available, but actual consumer execution will happen concurrently. + */ +public class DirectBlockingProducer extends DefaultAsyncProducer { + private static final transient Logger LOG = LoggerFactory.getLogger(DirectBlockingProducer.class); + private final DirectEndpoint endpoint; + + public DirectBlockingProducer(DirectEndpoint endpoint) { + super(endpoint); + this.endpoint = endpoint; + } + + public void process(Exchange exchange) throws Exception { + getConsumer(exchange).getProcessor().process(exchange); + } + + public boolean process(Exchange exchange, AsyncCallback callback) { + try { + return getConsumer(exchange).getAsyncProcessor().process(exchange, callback); + } catch (Exception e) { + exchange.setException(e); + callback.done(true); + return true; + } + } + + protected DirectConsumer getConsumer(Exchange exchange) throws Exception { + DirectConsumer answer = endpoint.getConsumer(); + if (answer == null) { + // okay then await until we have a consumer or we timed out + answer = awaitConsumer(); + if (answer == null) { + LOG.warn("No consumers available on endpoint: " + endpoint + " to process: " + exchange); + throw new DirectConsumerNotAvailableException("No consumers available on endpoint: " + endpoint, exchange); + } + } + + return answer; + } + + private DirectConsumer awaitConsumer() throws InterruptedException { + DirectConsumer answer = null; + + StopWatch watch = new StopWatch(); + boolean done = false; + while (!done) { + // sleep a bit to give chance for the consumer to be ready + Thread.sleep(500); + if (LOG.isDebugEnabled()) { + LOG.debug("Waited {} for consumer to be ready", watch.taken()); + } + + answer = endpoint.getConsumer(); + if (answer != null) { + return answer; + } + // we are done if we hit the timeout + done = watch.taken() >= endpoint.getTimeout(); + } + return answer; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/4684225f/camel-core/src/main/java/org/apache/camel/component/direct/DirectConsumer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/direct/DirectConsumer.java b/camel-core/src/main/java/org/apache/camel/component/direct/DirectConsumer.java index f97128d..83dbbca 100644 --- a/camel-core/src/main/java/org/apache/camel/component/direct/DirectConsumer.java +++ b/camel-core/src/main/java/org/apache/camel/component/direct/DirectConsumer.java @@ -38,6 +38,11 @@ public class DirectConsumer extends DefaultConsumer implements ShutdownAware, Su } @Override + public DirectEndpoint getEndpoint() { + return (DirectEndpoint) super.getEndpoint(); + } + + @Override protected void doStart() throws Exception { // add consumer to endpoint boolean existing = this == endpoint.getConsumer(); http://git-wip-us.apache.org/repos/asf/camel/blob/4684225f/camel-core/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java b/camel-core/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java index 9936505..844a74b 100644 --- a/camel-core/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java +++ b/camel-core/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java @@ -24,6 +24,7 @@ import org.apache.camel.Consumer; import org.apache.camel.Processor; import org.apache.camel.Producer; import org.apache.camel.impl.DefaultEndpoint; +import org.apache.camel.util.ObjectHelper; /** * Represents a direct endpoint that synchronously invokes the consumer of the @@ -34,6 +35,8 @@ import org.apache.camel.impl.DefaultEndpoint; public class DirectEndpoint extends DefaultEndpoint { private volatile Map<String, DirectConsumer> consumers; + private boolean block; + private long timeout = 30000L; public DirectEndpoint() { this.consumers = new HashMap<String, DirectConsumer>(); @@ -49,7 +52,11 @@ public class DirectEndpoint extends DefaultEndpoint { } public Producer createProducer() throws Exception { - return new DirectProducer(this); + if (block) { + return new DirectBlockingProducer(this); + } else { + return new DirectProducer(this); + } } public Consumer createConsumer(Processor processor) throws Exception { @@ -63,23 +70,47 @@ public class DirectEndpoint extends DefaultEndpoint { } public void addConsumer(DirectConsumer consumer) { - String key = consumer.getEndpoint().getEndpointKey(); + String key = consumer.getEndpoint().getKey(); consumers.put(key, consumer); } public void removeConsumer(DirectConsumer consumer) { - String key = consumer.getEndpoint().getEndpointKey(); + String key = consumer.getEndpoint().getKey(); consumers.remove(key); } public boolean hasConsumer(DirectConsumer consumer) { - String key = consumer.getEndpoint().getEndpointKey(); + String key = consumer.getEndpoint().getKey(); return consumers.containsKey(key); } public DirectConsumer getConsumer() { - String key = getEndpointKey(); + String key = getKey(); return consumers.get(key); } + public boolean isBlock() { + return block; + } + + public void setBlock(boolean block) { + this.block = block; + } + + public long getTimeout() { + return timeout; + } + + public void setTimeout(long timeout) { + this.timeout = timeout; + } + + protected String getKey() { + String uri = getEndpointUri(); + if (uri.indexOf('?') != -1) { + return ObjectHelper.before(uri, "?"); + } else { + return uri; + } + } } http://git-wip-us.apache.org/repos/asf/camel/blob/4684225f/camel-core/src/test/java/org/apache/camel/component/direct/DirectProducerBlockingTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/component/direct/DirectProducerBlockingTest.java b/camel-core/src/test/java/org/apache/camel/component/direct/DirectProducerBlockingTest.java new file mode 100644 index 0000000..5322202 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/component/direct/DirectProducerBlockingTest.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.direct; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.apache.camel.CamelExchangeException; +import org.apache.camel.CamelExecutionException; +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.util.StopWatch; + +public class DirectProducerBlockingTest extends ContextTestSupport { + + public void testProducerBlocksForSuspendedConsumer() throws Exception { + DirectEndpoint endpoint = getMandatoryEndpoint("direct:suspended", DirectEndpoint.class); + endpoint.getConsumer().suspend(); + + StopWatch watch = new StopWatch(); + try { + template.sendBody("direct:suspended?block=true&timeout=2000", "hello world"); + fail("Expected CamelExecutionException"); + } catch (CamelExecutionException e) { + DirectConsumerNotAvailableException cause = assertIsInstanceOf(DirectConsumerNotAvailableException.class, e.getCause()); + assertIsInstanceOf(CamelExchangeException.class, cause); + assertTrue(watch.taken() > 1500); + } + } + + public void testProducerBlocksWithNoConsumers() throws Exception { + DirectEndpoint endpoint = getMandatoryEndpoint("direct:suspended", DirectEndpoint.class); + endpoint.getConsumer().suspend(); + + StopWatch watch = new StopWatch(); + try { + template.sendBody("direct:start?block=true&timeout=2000", "hello world"); + fail("Expected CamelExecutionException"); + } catch (CamelExecutionException e) { + DirectConsumerNotAvailableException cause = assertIsInstanceOf(DirectConsumerNotAvailableException.class, e.getCause()); + assertIsInstanceOf(CamelExchangeException.class, cause); + + assertTrue(watch.taken() > 1500); + } + } + + public void testProducerBlocksResumeTest() throws Exception { + context.suspendRoute("foo"); + + ExecutorService executor = Executors.newSingleThreadExecutor(); + executor.submit(new Runnable() { + @Override + public void run() { + try { + Thread.sleep(2000); + log.info("Resuming consumer"); + context.resumeRoute("foo"); + } catch (Exception e) { + // ignore + } + } + }); + + getMockEndpoint("mock:result").expectedMessageCount(1); + + template.sendBody("direct:suspended?block=true&timeout=5000", "hello world"); + + assertMockEndpointsSatisfied(); + + executor.shutdownNow(); + } + + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() throws Exception { + from("direct:suspended").routeId("foo") + .to("mock:result"); + } + }; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/4684225f/camel-core/src/test/java/org/apache/camel/impl/CustomExchangePatternTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/impl/CustomExchangePatternTest.java b/camel-core/src/test/java/org/apache/camel/impl/CustomExchangePatternTest.java deleted file mode 100644 index cc942df..0000000 --- a/camel-core/src/test/java/org/apache/camel/impl/CustomExchangePatternTest.java +++ /dev/null @@ -1,96 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.impl; - -import java.util.List; - -import org.apache.camel.ContextTestSupport; -import org.apache.camel.Exchange; -import org.apache.camel.ExchangePattern; -import org.apache.camel.Processor; -import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.component.mock.MockEndpoint; - -/** - * @version - */ -public class CustomExchangePatternTest extends ContextTestSupport { - protected MockEndpoint resultEndpoint; - - public void testInOut() throws Exception { - final ExchangePattern expectedPattern = ExchangePattern.InOut; - - template.send("direct:start", expectedPattern, new Processor() { - public void process(Exchange exchange) throws Exception { - assertEquals("MEP", expectedPattern, exchange.getPattern()); - exchange.getIn().setBody("<hello>world!</hello>"); - } - }); - - resultEndpoint.assertIsSatisfied(); - assertReceivedExpectedPattern(expectedPattern); - } - - public void testInOnly() throws Exception { - ExchangePattern expectedPattern = ExchangePattern.InOnly; - - template.send("direct:start", expectedPattern, new Processor() { - public void process(Exchange exchange) throws Exception { - exchange.getIn().setBody("<hello>world!</hello>"); - } - }); - - resultEndpoint.assertIsSatisfied(); - assertReceivedExpectedPattern(expectedPattern); - } - - public void testInOutViaUri() throws Exception { - final ExchangePattern expectedPattern = ExchangePattern.InOut; - - template.send("direct:start?exchangePattern=InOut", new Processor() { - public void process(Exchange exchange) throws Exception { - assertEquals("MEP", expectedPattern, exchange.getPattern()); - exchange.getIn().setBody("<hello>world!</hello>"); - } - }); - - resultEndpoint.assertIsSatisfied(); - assertReceivedExpectedPattern(expectedPattern); - } - - @Override - protected void setUp() throws Exception { - super.setUp(); - resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class); - resultEndpoint.expectedMessageCount(1); - } - - protected void assertReceivedExpectedPattern(ExchangePattern expectedPattern) { - List<Exchange> list = resultEndpoint.getReceivedExchanges(); - Exchange exchange = list.get(0); - assertEquals("MEP", expectedPattern, exchange.getPattern()); - } - - protected RouteBuilder createRouteBuilder() { - return new RouteBuilder() { - public void configure() { - from("direct:start").to("mock:result"); - from("direct:start?exchangePattern=InOut").to("mock:result"); - } - }; - } -} \ No newline at end of file