Updated Branches: refs/heads/camel-2.11.x 66e1c2812 -> d69fe7af4
CAMEL-6151: Added support for blocking direct-vm producers. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d69fe7af Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d69fe7af Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d69fe7af Branch: refs/heads/camel-2.11.x Commit: d69fe7af49e4ee723a4a80b3ed2584daa0a38e74 Parents: 66e1c28 Author: Claus Ibsen <davscl...@apache.org> Authored: Wed Jun 5 15:54:00 2013 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Wed Jun 5 15:54:54 2013 +0200 ---------------------------------------------------------------------- .../directvm/DirectVmBlockingProducer.java | 97 +++++++++++++++ .../component/directvm/DirectVmComponent.java | 19 +++ .../camel/component/directvm/DirectVmConsumer.java | 13 ++- .../camel/component/directvm/DirectVmEndpoint.java | 24 ++++- .../directvm/AbstractDirectVmTestSupport.java | 2 +- .../directvm/DirectVmProducerBlockingTest.java | 96 ++++++++++++++ 6 files changed, 248 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/d69fe7af/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmBlockingProducer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmBlockingProducer.java b/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmBlockingProducer.java new file mode 100644 index 0000000..77aa69c --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmBlockingProducer.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.directvm; + +import org.apache.camel.AsyncCallback; +import org.apache.camel.Exchange; +import org.apache.camel.impl.DefaultAsyncProducer; +import org.apache.camel.util.StopWatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The direct producer. + * <p/> + * If blocking is enabled ({@code DirectEndpoint#isBlock}) then the DirectEndpoint will create an instance + * of this class instead of {@code DirectProducer}. + * This producers {@code process} method will block for the configured duration ({@code DirectEndpoint#getTimeout}, + * default to 30 seconds). After which if a consumer is still unavailable a DirectConsumerNotAvailableException + * will be thrown. + * <p/> + * Implementation note: Concurrent Producers will block for the duration it takes to determine if a + * consumer is available, but actual consumer execution will happen concurrently. + */ +public class DirectVmBlockingProducer extends DefaultAsyncProducer { + private static final transient Logger LOG = LoggerFactory.getLogger(DirectVmBlockingProducer.class); + private final DirectVmEndpoint endpoint; + + public DirectVmBlockingProducer(DirectVmEndpoint endpoint) { + super(endpoint); + this.endpoint = endpoint; + } + + public void process(Exchange exchange) throws Exception { + getConsumer(exchange).getProcessor().process(exchange); + } + + public boolean process(Exchange exchange, AsyncCallback callback) { + try { + return getConsumer(exchange).getAsyncProcessor().process(exchange, callback); + } catch (Exception e) { + exchange.setException(e); + callback.done(true); + return true; + } + } + + protected DirectVmConsumer getConsumer(Exchange exchange) throws Exception { + DirectVmConsumer answer = endpoint.getConsumer(); + if (answer == null) { + // okay then await until we have a consumer or we timed out + answer = awaitConsumer(); + if (answer == null) { + LOG.warn("No consumers available on endpoint: " + endpoint + " to process: " + exchange); + throw new DirectVmConsumerNotAvailableException("No consumers available on endpoint: " + endpoint, exchange); + } + } + + return answer; + } + + private DirectVmConsumer awaitConsumer() throws InterruptedException { + DirectVmConsumer answer = null; + + StopWatch watch = new StopWatch(); + boolean done = false; + while (!done) { + // sleep a bit to give chance for the consumer to be ready + Thread.sleep(500); + if (LOG.isDebugEnabled()) { + LOG.debug("Waited {} for consumer to be ready", watch.taken()); + } + + answer = endpoint.getConsumer(); + if (answer != null) { + return answer; + } + // we are done if we hit the timeout + done = watch.taken() >= endpoint.getTimeout(); + } + return answer; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/d69fe7af/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmComponent.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmComponent.java b/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmComponent.java index 513d174..6ad3d28 100644 --- a/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmComponent.java +++ b/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmComponent.java @@ -38,6 +38,8 @@ public class DirectVmComponent extends DefaultComponent { // later in case the DirectVmEndpoint was re-created due the old was evicted from the endpoints LRUCache // on DefaultCamelContext private static final ConcurrentMap<String, DirectVmConsumer> CONSUMERS = new ConcurrentHashMap<String, DirectVmConsumer>(); + private boolean block; + private long timeout = 30000L; /** * Gets all the consumer endpoints. @@ -55,6 +57,8 @@ public class DirectVmComponent extends DefaultComponent { @Override protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { DirectVmEndpoint answer = new DirectVmEndpoint(uri, this); + answer.setBlock(block); + answer.setTimeout(timeout); answer.configureProperties(parameters); return answer; } @@ -101,4 +105,19 @@ public class DirectVmComponent extends DefaultComponent { super.doStop(); } + public boolean isBlock() { + return block; + } + + public void setBlock(boolean block) { + this.block = block; + } + + public long getTimeout() { + return timeout; + } + + public void setTimeout(long timeout) { + this.timeout = timeout; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/d69fe7af/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmConsumer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmConsumer.java b/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmConsumer.java index 5ab0bb0..037b7e2 100644 --- a/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmConsumer.java +++ b/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmConsumer.java @@ -17,12 +17,13 @@ package org.apache.camel.component.directvm; import org.apache.camel.Processor; +import org.apache.camel.SuspendableService; import org.apache.camel.impl.DefaultConsumer; /** * The direct-vm consumer */ -public class DirectVmConsumer extends DefaultConsumer { +public class DirectVmConsumer extends DefaultConsumer implements SuspendableService { public DirectVmConsumer(DirectVmEndpoint endpoint, Processor processor) { super(endpoint, processor); @@ -45,4 +46,14 @@ public class DirectVmConsumer extends DefaultConsumer { super.doStop(); } + @Override + protected void doSuspend() throws Exception { + getEndpoint().getComponent().removeConsumer(getEndpoint(), this); + } + + @Override + protected void doResume() throws Exception { + getEndpoint().getComponent().addConsumer(getEndpoint(), this); + } + } http://git-wip-us.apache.org/repos/asf/camel/blob/d69fe7af/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmEndpoint.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmEndpoint.java b/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmEndpoint.java index 230b93f..d5a9885 100644 --- a/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmEndpoint.java +++ b/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmEndpoint.java @@ -26,6 +26,9 @@ import org.apache.camel.impl.DefaultEndpoint; */ public class DirectVmEndpoint extends DefaultEndpoint { + private boolean block; + private long timeout = 30000L; + public DirectVmEndpoint(String endpointUri, DirectVmComponent component) { super(endpointUri, component); } @@ -37,7 +40,11 @@ public class DirectVmEndpoint extends DefaultEndpoint { @Override public Producer createProducer() throws Exception { - return new DirectVmProducer(this); + if (block) { + return new DirectVmBlockingProducer(this); + } else { + return new DirectVmProducer(this); + } } @Override @@ -56,4 +63,19 @@ public class DirectVmEndpoint extends DefaultEndpoint { return getComponent().getConsumer(this); } + public boolean isBlock() { + return block; + } + + public void setBlock(boolean block) { + this.block = block; + } + + public long getTimeout() { + return timeout; + } + + public void setTimeout(long timeout) { + this.timeout = timeout; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/d69fe7af/camel-core/src/test/java/org/apache/camel/component/directvm/AbstractDirectVmTestSupport.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/component/directvm/AbstractDirectVmTestSupport.java b/camel-core/src/test/java/org/apache/camel/component/directvm/AbstractDirectVmTestSupport.java index 2724813..f093a18 100644 --- a/camel-core/src/test/java/org/apache/camel/component/directvm/AbstractDirectVmTestSupport.java +++ b/camel-core/src/test/java/org/apache/camel/component/directvm/AbstractDirectVmTestSupport.java @@ -28,7 +28,7 @@ import org.junit.Before; /** * */ -public class AbstractDirectVmTestSupport extends ContextTestSupport { +public abstract class AbstractDirectVmTestSupport extends ContextTestSupport { protected CamelContext context2; protected ProducerTemplate template2; http://git-wip-us.apache.org/repos/asf/camel/blob/d69fe7af/camel-core/src/test/java/org/apache/camel/component/directvm/DirectVmProducerBlockingTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/component/directvm/DirectVmProducerBlockingTest.java b/camel-core/src/test/java/org/apache/camel/component/directvm/DirectVmProducerBlockingTest.java new file mode 100644 index 0000000..9c92785 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/component/directvm/DirectVmProducerBlockingTest.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.directvm; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.apache.camel.CamelExchangeException; +import org.apache.camel.CamelExecutionException; +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.util.StopWatch; + +public class DirectVmProducerBlockingTest extends ContextTestSupport { + + public void testProducerBlocksForSuspendedConsumer() throws Exception { + DirectVmEndpoint endpoint = getMandatoryEndpoint("direct-vm:suspended", DirectVmEndpoint.class); + endpoint.getConsumer().suspend(); + + StopWatch watch = new StopWatch(); + try { + template.sendBody("direct-vm:suspended?block=true&timeout=2000", "hello world"); + fail("Expected CamelExecutionException"); + } catch (CamelExecutionException e) { + DirectVmConsumerNotAvailableException cause = assertIsInstanceOf(DirectVmConsumerNotAvailableException.class, e.getCause()); + assertIsInstanceOf(CamelExchangeException.class, cause); + assertTrue(watch.taken() > 1500); + } + } + + public void testProducerBlocksWithNoConsumers() throws Exception { + DirectVmEndpoint endpoint = getMandatoryEndpoint("direct-vm:suspended", DirectVmEndpoint.class); + endpoint.getConsumer().suspend(); + + StopWatch watch = new StopWatch(); + try { + template.sendBody("direct-vm:start?block=true&timeout=2000", "hello world"); + fail("Expected CamelExecutionException"); + } catch (CamelExecutionException e) { + DirectVmConsumerNotAvailableException cause = assertIsInstanceOf(DirectVmConsumerNotAvailableException.class, e.getCause()); + assertIsInstanceOf(CamelExchangeException.class, cause); + + assertTrue(watch.taken() > 1500); + } + } + + public void testProducerBlocksResumeTest() throws Exception { + context.suspendRoute("foo"); + + ExecutorService executor = Executors.newSingleThreadExecutor(); + executor.submit(new Runnable() { + @Override + public void run() { + try { + Thread.sleep(2000); + log.info("Resuming consumer"); + context.resumeRoute("foo"); + } catch (Exception e) { + // ignore + } + } + }); + + getMockEndpoint("mock:result").expectedMessageCount(1); + + template.sendBody("direct-vm:suspended?block=true&timeout=5000", "hello world"); + + assertMockEndpointsSatisfied(); + + executor.shutdownNow(); + } + + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() throws Exception { + from("direct-vm:suspended").routeId("foo") + .to("mock:result"); + } + }; + } + +}