Author: davsclaus Date: Sat Jun 25 12:00:52 2011 New Revision: 1139531 URL: http://svn.apache.org/viewvc?rev=1139531&view=rev Log: CAMEL-4153: Seda consumer now supports suspend/resume as a more gentle way of stopping it.
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaConsumerSuspendResumeTest.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java camel/trunk/camel-core/src/test/java/org/apache/camel/impl/RouteSedaSuspendResumeTest.java camel/trunk/camel-core/src/test/java/org/apache/camel/impl/TwoRouteSuspendResumeTest.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java?rev=1139531&r1=1139530&r2=1139531&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java Sat Jun 25 12:00:52 2011 @@ -20,6 +20,7 @@ import java.util.concurrent.BlockingQueu import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.camel.AsyncCallback; import org.apache.camel.AsyncProcessor; @@ -28,6 +29,7 @@ import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.ShutdownRunningTask; +import org.apache.camel.SuspendableService; import org.apache.camel.impl.LoggingExceptionHandler; import org.apache.camel.impl.ServiceSupport; import org.apache.camel.impl.converter.AsyncProcessorTypeConverter; @@ -43,9 +45,10 @@ import org.slf4j.LoggerFactory; * * @version */ -public class SedaConsumer extends ServiceSupport implements Consumer, Runnable, ShutdownAware { +public class SedaConsumer extends ServiceSupport implements Consumer, Runnable, ShutdownAware, SuspendableService { private static final transient Logger LOG = LoggerFactory.getLogger(SedaConsumer.class); + private final AtomicInteger taskCount = new AtomicInteger(); private CountDownLatch latch; private volatile boolean shutdownPending; private SedaEndpoint endpoint; @@ -107,10 +110,40 @@ public class SedaConsumer extends Servic } } + @Override + public boolean isRunAllowed() { + if (isSuspending() || isSuspended()) { + // allow to run even if we are suspended as we want to + // keep the thread task running + return true; + } + return super.isRunAllowed(); + } + public void run() { + taskCount.incrementAndGet(); + try { + doRun(); + } finally { + taskCount.decrementAndGet(); + } + } + + protected void doRun() { BlockingQueue<Exchange> queue = endpoint.getQueue(); // loop while we are allowed, or if we are stopping loop until the queue is empty while (queue != null && (isRunAllowed())) { + // do not poll if we are suspended + if (isSuspending() || isSuspended()) { + LOG.trace("Consumer is suspended so skip polling"); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + LOG.debug("Sleep interrupted, are we stopping? {}", isStopping() || isStopped()); + } + continue; + } + Exchange exchange = null; try { exchange = queue.poll(1000, TimeUnit.MILLISECONDS); @@ -190,12 +223,18 @@ public class SedaConsumer extends Servic latch = new CountDownLatch(endpoint.getConcurrentConsumers()); shutdownPending = false; - int poolSize = endpoint.getConcurrentConsumers(); - executor = endpoint.getCamelContext().getExecutorServiceStrategy() - .newFixedThreadPool(this, endpoint.getEndpointUri(), poolSize); - for (int i = 0; i < poolSize; i++) { - executor.execute(this); - } + setupTasks(); + endpoint.onStarted(this); + } + + @Override + protected void doSuspend() throws Exception { + endpoint.onStopped(this); + } + + @Override + protected void doResume() throws Exception { + setupTasks(); endpoint.onStarted(this); } @@ -210,4 +249,24 @@ public class SedaConsumer extends Servic } } + /** + * Setup the thread pool and ensures tasks gets executed (if needed) + */ + private void setupTasks() { + int poolSize = endpoint.getConcurrentConsumers(); + + // create thread pool if needed + if (executor == null) { + executor = endpoint.getCamelContext().getExecutorServiceStrategy() + .newFixedThreadPool(this, endpoint.getEndpointUri(), poolSize); + } + + // submit needed number of tasks + int tasks = poolSize - taskCount.get(); + LOG.debug("Creating {} consumer tasks", tasks); + for (int i = 0; i < tasks; i++) { + executor.execute(this); + } + } + } Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaConsumerSuspendResumeTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaConsumerSuspendResumeTest.java?rev=1139531&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaConsumerSuspendResumeTest.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaConsumerSuspendResumeTest.java Sat Jun 25 12:00:52 2011 @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.seda; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.util.ServiceHelper; + +/** + * + */ +public class SedaConsumerSuspendResumeTest extends ContextTestSupport { + + public void testSuspendResume() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:bar"); + mock.expectedMessageCount(1); + + template.sendBody("seda:foo", "A"); + + mock.assertIsSatisfied(); + + assertEquals("Started", context.getRouteStatus("foo").name()); + assertEquals("Started", context.getRouteStatus("bar").name()); + + // suspend bar consumer (not the route) + SedaConsumer consumer = (SedaConsumer) context.getRoute("bar").getConsumer(); + + ServiceHelper.suspendService(consumer); + assertEquals("Suspended", consumer.getStatus().name()); + + // send a message to the route but the consumer is suspended + // so it should not route it + resetMocks(); + mock.expectedMessageCount(0); + + // wait a bit to ensure consumer is suspended, as it could be in a poll mode where + // it would poll and route (there is a little slack (up till 1 sec) before suspension is empowered) + Thread.sleep(2000); + + template.sendBody("seda:foo", "B"); + // wait 2 sec to ensure seda consumer thread would have tried to poll otherwise + mock.assertIsSatisfied(2000); + + // resume consumer + resetMocks(); + mock.expectedMessageCount(1); + + // resume bar consumer (not the route) + ServiceHelper.resumeService(consumer); + assertEquals("Started", consumer.getStatus().name()); + + // the message should be routed now + mock.assertIsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("seda:foo").routeId("foo").to("seda:bar"); + + from("seda:bar").routeId("bar").to("mock:bar"); + } + }; + } +} Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/RouteSedaSuspendResumeTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/RouteSedaSuspendResumeTest.java?rev=1139531&r1=1139530&r2=1139531&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/RouteSedaSuspendResumeTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/RouteSedaSuspendResumeTest.java Sat Jun 25 12:00:52 2011 @@ -40,8 +40,7 @@ public class RouteSedaSuspendResumeTest mock.expectedMessageCount(0); context.suspendRoute("foo"); - // seda consumer doesnt support suspension so it will stop instead - assertEquals("Stopped", context.getRouteStatus("foo").name()); + assertEquals("Suspended", context.getRouteStatus("foo").name()); template.sendBody("seda:foo", "B"); mock.assertIsSatisfied(1000); Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/TwoRouteSuspendResumeTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/TwoRouteSuspendResumeTest.java?rev=1139531&r1=1139530&r2=1139531&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/TwoRouteSuspendResumeTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/TwoRouteSuspendResumeTest.java Sat Jun 25 12:00:52 2011 @@ -51,8 +51,7 @@ public class TwoRouteSuspendResumeTest e mockBar.assertIsSatisfied(); mock.assertIsSatisfied(1000); - // seda consumer doesnt support suspension so it will stop instead - assertEquals("Stopped", context.getRouteStatus("foo").name()); + assertEquals("Suspended", context.getRouteStatus("foo").name()); assertEquals("Started", context.getRouteStatus("bar").name()); log.info("Resuming");