Author: davsclaus
Date: Sat Jun 25 12:00:52 2011
New Revision: 1139531

URL: http://svn.apache.org/viewvc?rev=1139531&view=rev
Log:
CAMEL-4153: Seda consumer now supports suspend/resume as a more gentle way of 
stopping it.

Added:
    
camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaConsumerSuspendResumeTest.java
Modified:
    
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
    
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/RouteSedaSuspendResumeTest.java
    
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/TwoRouteSuspendResumeTest.java

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java?rev=1139531&r1=1139530&r2=1139531&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
 Sat Jun 25 12:00:52 2011
@@ -20,6 +20,7 @@ import java.util.concurrent.BlockingQueu
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
@@ -28,6 +29,7 @@ import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.ShutdownRunningTask;
+import org.apache.camel.SuspendableService;
 import org.apache.camel.impl.LoggingExceptionHandler;
 import org.apache.camel.impl.ServiceSupport;
 import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
@@ -43,9 +45,10 @@ import org.slf4j.LoggerFactory;
  *
  * @version 
  */
-public class SedaConsumer extends ServiceSupport implements Consumer, 
Runnable, ShutdownAware {
+public class SedaConsumer extends ServiceSupport implements Consumer, 
Runnable, ShutdownAware, SuspendableService {
     private static final transient Logger LOG = 
LoggerFactory.getLogger(SedaConsumer.class);
 
+    private final AtomicInteger taskCount = new AtomicInteger();
     private CountDownLatch latch;
     private volatile boolean shutdownPending;
     private SedaEndpoint endpoint;
@@ -107,10 +110,40 @@ public class SedaConsumer extends Servic
         }
     }
 
+    @Override
+    public boolean isRunAllowed() {
+        if (isSuspending() || isSuspended()) {
+            // allow to run even if we are suspended as we want to
+            // keep the thread task running
+            return true;
+        }
+        return super.isRunAllowed();
+    }
+
     public void run() {
+        taskCount.incrementAndGet();
+        try {
+            doRun();
+        } finally {
+            taskCount.decrementAndGet();
+        }
+    }
+
+    protected void doRun() {
         BlockingQueue<Exchange> queue = endpoint.getQueue();
         // loop while we are allowed, or if we are stopping loop until the 
queue is empty
         while (queue != null && (isRunAllowed())) {
+            // do not poll if we are suspended
+            if (isSuspending() || isSuspended()) {
+                LOG.trace("Consumer is suspended so skip polling");
+                try {
+                    Thread.sleep(1000);
+                } catch (InterruptedException e) {
+                    LOG.debug("Sleep interrupted, are we stopping? {}", 
isStopping() || isStopped());
+                }
+                continue;
+            }
+
             Exchange exchange = null;
             try {
                 exchange = queue.poll(1000, TimeUnit.MILLISECONDS);
@@ -190,12 +223,18 @@ public class SedaConsumer extends Servic
         latch = new CountDownLatch(endpoint.getConcurrentConsumers());
         shutdownPending = false;
 
-        int poolSize = endpoint.getConcurrentConsumers();
-        executor = endpoint.getCamelContext().getExecutorServiceStrategy()
-                        .newFixedThreadPool(this, endpoint.getEndpointUri(), 
poolSize);
-        for (int i = 0; i < poolSize; i++) {
-            executor.execute(this);
-        }
+        setupTasks();
+        endpoint.onStarted(this);
+    }
+
+    @Override
+    protected void doSuspend() throws Exception {
+        endpoint.onStopped(this);
+    }
+
+    @Override
+    protected void doResume() throws Exception {
+        setupTasks();
         endpoint.onStarted(this);
     }
 
@@ -210,4 +249,24 @@ public class SedaConsumer extends Servic
         }
     }
 
+    /**
+     * Setup the thread pool and ensures tasks gets executed (if needed)
+     */
+    private void setupTasks() {
+        int poolSize = endpoint.getConcurrentConsumers();
+
+        // create thread pool if needed
+        if (executor == null) {
+            executor = endpoint.getCamelContext().getExecutorServiceStrategy()
+                    .newFixedThreadPool(this, endpoint.getEndpointUri(), 
poolSize);
+        }
+
+        // submit needed number of tasks
+        int tasks = poolSize - taskCount.get();
+        LOG.debug("Creating {} consumer tasks", tasks);
+        for (int i = 0; i < tasks; i++) {
+            executor.execute(this);
+        }
+    }
+
 }

Added: 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaConsumerSuspendResumeTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaConsumerSuspendResumeTest.java?rev=1139531&view=auto
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaConsumerSuspendResumeTest.java
 (added)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaConsumerSuspendResumeTest.java
 Sat Jun 25 12:00:52 2011
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.seda;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.util.ServiceHelper;
+
+/**
+ *
+ */
+public class SedaConsumerSuspendResumeTest extends ContextTestSupport {
+
+    public void testSuspendResume() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:bar");
+        mock.expectedMessageCount(1);
+
+        template.sendBody("seda:foo", "A");
+
+        mock.assertIsSatisfied();
+
+        assertEquals("Started", context.getRouteStatus("foo").name());
+        assertEquals("Started", context.getRouteStatus("bar").name());
+
+        // suspend bar consumer (not the route)
+        SedaConsumer consumer = (SedaConsumer) 
context.getRoute("bar").getConsumer();
+
+        ServiceHelper.suspendService(consumer);
+        assertEquals("Suspended", consumer.getStatus().name());
+
+        // send a message to the route but the consumer is suspended
+        // so it should not route it
+        resetMocks();
+        mock.expectedMessageCount(0);
+
+        // wait a bit to ensure consumer is suspended, as it could be in a 
poll mode where
+        // it would poll and route (there is a little slack (up till 1 sec) 
before suspension is empowered)
+        Thread.sleep(2000);
+
+        template.sendBody("seda:foo", "B");
+        // wait 2 sec to ensure seda consumer thread would have tried to poll 
otherwise
+        mock.assertIsSatisfied(2000);
+
+        // resume consumer
+        resetMocks();
+        mock.expectedMessageCount(1);
+
+        // resume bar consumer (not the route)
+        ServiceHelper.resumeService(consumer);
+        assertEquals("Started", consumer.getStatus().name());
+
+        // the message should be routed now
+        mock.assertIsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("seda:foo").routeId("foo").to("seda:bar");
+
+                from("seda:bar").routeId("bar").to("mock:bar");
+            }
+        };
+    }
+}

Modified: 
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/RouteSedaSuspendResumeTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/RouteSedaSuspendResumeTest.java?rev=1139531&r1=1139530&r2=1139531&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/RouteSedaSuspendResumeTest.java
 (original)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/RouteSedaSuspendResumeTest.java
 Sat Jun 25 12:00:52 2011
@@ -40,8 +40,7 @@ public class RouteSedaSuspendResumeTest 
         mock.expectedMessageCount(0);
         context.suspendRoute("foo");
 
-        // seda consumer doesnt support suspension so it will stop instead
-        assertEquals("Stopped", context.getRouteStatus("foo").name());
+        assertEquals("Suspended", context.getRouteStatus("foo").name());
 
         template.sendBody("seda:foo", "B");
         mock.assertIsSatisfied(1000);

Modified: 
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/TwoRouteSuspendResumeTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/TwoRouteSuspendResumeTest.java?rev=1139531&r1=1139530&r2=1139531&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/TwoRouteSuspendResumeTest.java
 (original)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/TwoRouteSuspendResumeTest.java
 Sat Jun 25 12:00:52 2011
@@ -51,8 +51,7 @@ public class TwoRouteSuspendResumeTest e
         mockBar.assertIsSatisfied();
         mock.assertIsSatisfied(1000);
 
-        // seda consumer doesnt support suspension so it will stop instead
-        assertEquals("Stopped", context.getRouteStatus("foo").name());
+        assertEquals("Suspended", context.getRouteStatus("foo").name());
         assertEquals("Started", context.getRouteStatus("bar").name());
 
         log.info("Resuming");


Reply via email to