Repository: camel
Updated Branches:
  refs/heads/master 64d08a7e3 -> b80021a15


Fix for https://issues.apache.org/jira/browse/CAMEL-5113 Parallel and fault 
tolerant message processing for SQS endpoints


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/b80021a1
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/b80021a1
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/b80021a1

Branch: refs/heads/master
Commit: b80021a1551213e155c4ec8b1464831e9a6ab1d3
Parents: 64d08a7
Author: Christian Posta <christian.po...@gmail.com>
Authored: Thu Jan 8 16:31:38 2015 -0700
Committer: Christian Posta <christian.po...@gmail.com>
Committed: Thu Jan 8 16:31:38 2015 -0700

----------------------------------------------------------------------
 .../DefaultScheduledPollConsumerScheduler.java  |  47 ++++++---
 .../component/aws/sqs/SqsConfiguration.java     |  10 ++
 .../camel/component/aws/sqs/SqsEndpoint.java    |   4 +
 .../aws/sqs/SqsConcurrentConsumerTest.java      | 100 +++++++++++++++++++
 4 files changed, 148 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/b80021a1/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumerScheduler.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumerScheduler.java
 
b/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumerScheduler.java
index 729ee75..db4e4d1 100644
--- 
a/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumerScheduler.java
+++ 
b/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumerScheduler.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.impl;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Locale;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
@@ -38,8 +40,9 @@ public class DefaultScheduledPollConsumerScheduler extends 
org.apache.camel.supp
     private Consumer consumer;
     private ScheduledExecutorService scheduledExecutorService;
     private boolean shutdownExecutor;
-    private volatile ScheduledFuture<?> future;
+    private volatile List<ScheduledFuture<?>> futures = new 
ArrayList<ScheduledFuture<?>>();
     private Runnable task;
+    private int concurrentTasks = 1;
 
     private long initialDelay = 1000;
     private long delay = 500;
@@ -94,6 +97,14 @@ public class DefaultScheduledPollConsumerScheduler extends 
org.apache.camel.supp
         this.scheduledExecutorService = scheduledExecutorService;
     }
 
+    public int getConcurrentTasks() {
+        return concurrentTasks;
+    }
+
+    public void setConcurrentTasks(int concurrentTasks) {
+        this.concurrentTasks = concurrentTasks;
+    }
+
     @Override
     public void onInit(Consumer consumer) {
         this.consumer = consumer;
@@ -106,34 +117,41 @@ public class DefaultScheduledPollConsumerScheduler 
extends org.apache.camel.supp
 
     @Override
     public void unscheduleTask() {
-        if (future != null) {
-            future.cancel(false);
+        if (isSchedulerStarted()) {
+            for (ScheduledFuture<?> future : futures) {
+                future.cancel(true);
+            }
+            futures.clear();
         }
     }
 
     @Override
     public void startScheduler() {
         // only schedule task if we have not already done that
-        if (future == null) {
+        if (futures.size() == 0) {
             if (isUseFixedDelay()) {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("Scheduling poll (fixed delay) with 
initialDelay: {}, delay: {} ({}) for: {}",
                             new Object[]{getInitialDelay(), getDelay(), 
getTimeUnit().name().toLowerCase(Locale.ENGLISH), consumer.getEndpoint()});
                 }
-                future = scheduledExecutorService.scheduleWithFixedDelay(task, 
getInitialDelay(), getDelay(), getTimeUnit());
+                for (int i = 0; i < concurrentTasks; i++) {
+                    
futures.add(scheduledExecutorService.scheduleWithFixedDelay(task, 
getInitialDelay(), getDelay(), getTimeUnit()));
+                }
             } else {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("Scheduling poll (fixed rate) with initialDelay: 
{}, delay: {} ({}) for: {}",
                             new Object[]{getInitialDelay(), getDelay(), 
getTimeUnit().name().toLowerCase(Locale.ENGLISH), consumer.getEndpoint()});
                 }
-                future = scheduledExecutorService.scheduleAtFixedRate(task, 
getInitialDelay(), getDelay(), getTimeUnit());
+                for (int i = 0; i < concurrentTasks; i++) {
+                    
futures.add(scheduledExecutorService.scheduleAtFixedRate(task, 
getInitialDelay(), getDelay(), getTimeUnit()));
+                }
             }
         }
     }
 
     @Override
     public boolean isSchedulerStarted() {
-        return future != null;
+        return futures != null && futures.size() > 0;
     }
 
     @Override
@@ -146,7 +164,7 @@ public class DefaultScheduledPollConsumerScheduler extends 
org.apache.camel.supp
         if (scheduledExecutorService == null) {
             // we only need one thread in the pool to schedule this task
             this.scheduledExecutorService = 
getCamelContext().getExecutorServiceManager()
-                    .newSingleThreadScheduledExecutor(consumer, 
consumer.getEndpoint().getEndpointUri());
+                    .newScheduledThreadPool(consumer, 
consumer.getEndpoint().getEndpointUri(), concurrentTasks);
             // and we should shutdown the thread pool when no longer needed
             this.shutdownExecutor = true;
         }
@@ -154,17 +172,20 @@ public class DefaultScheduledPollConsumerScheduler 
extends org.apache.camel.supp
 
     @Override
     protected void doStop() throws Exception {
-        if (future != null) {
-            LOG.debug("This consumer is stopping, so cancelling scheduled 
task: " + future);
-            future.cancel(true);
-            future = null;
+        if (isSchedulerStarted()) {
+            LOG.debug("This consumer is stopping, so cancelling scheduled 
task: " + futures);
+            for (ScheduledFuture<?> future : futures) {
+                future.cancel(true);
+            }
+            futures.clear();
         }
 
         if (shutdownExecutor && scheduledExecutorService != null) {
             
getCamelContext().getExecutorServiceManager().shutdownNow(scheduledExecutorService);
             scheduledExecutorService = null;
-            future = null;
+            futures.clear();
         }
     }
 
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/b80021a1/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java
----------------------------------------------------------------------
diff --git 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java
 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java
index 07a9ff2..da6f3b4 100644
--- 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java
+++ 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java
@@ -58,6 +58,7 @@ public class SqsConfiguration {
     private Integer defaultVisibilityTimeout;
     @UriParam(defaultValue = "false")
     private Boolean extendMessageVisibility = Boolean.FALSE;
+    private Integer concurrentConsumers = 1;
 
     // producer properties
     @UriParam
@@ -245,6 +246,14 @@ public class SqsConfiguration {
         this.region = region;
     }
 
+    public Integer getConcurrentConsumers() {
+        return concurrentConsumers;
+    }
+
+    public void setConcurrentConsumers(Integer concurrentConsumers) {
+        this.concurrentConsumers = concurrentConsumers;
+    }
+
     @Override
     public String toString() {
         return "SqsConfiguration[queueName=" + queueName
@@ -266,6 +275,7 @@ public class SqsConfiguration {
             + ", redrivePolicy=" + redrivePolicy
             + ", extendMessageVisibility=" + extendMessageVisibility
             + ", queueOwnerAWSAccountId=" + queueOwnerAWSAccountId
+            + ", concurrentConsumers=" + concurrentConsumers
             + ", region=" + region
             + "]";
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/b80021a1/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java
 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java
index d2cc213..4845dd0 100644
--- 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java
+++ 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java
@@ -38,6 +38,7 @@ import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.impl.DefaultScheduledPollConsumerScheduler;
 import org.apache.camel.impl.ScheduledPollEndpoint;
 import org.apache.camel.spi.HeaderFilterStrategy;
 import org.apache.camel.spi.HeaderFilterStrategyAware;
@@ -89,6 +90,9 @@ public class SqsEndpoint extends ScheduledPollEndpoint 
implements HeaderFilterSt
         SqsConsumer sqsConsumer = new SqsConsumer(this, processor);
         configureConsumer(sqsConsumer);
         sqsConsumer.setMaxMessagesPerPoll(maxMessagesPerPoll);
+        DefaultScheduledPollConsumerScheduler scheduler = new 
DefaultScheduledPollConsumerScheduler();
+        scheduler.setConcurrentTasks(configuration.getConcurrentConsumers());
+        sqsConsumer.setScheduler(scheduler);
         return sqsConsumer;
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/b80021a1/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsConcurrentConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsConcurrentConsumerTest.java
 
b/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsConcurrentConsumerTest.java
new file mode 100644
index 0000000..9de5627
--- /dev/null
+++ 
b/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsConcurrentConsumerTest.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.sqs;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import com.amazonaws.services.sqs.model.Message;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.NotifyBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+
+
+/**
+ * Created by ceposta
+ * <a href="http://christianposta.com/blog>http://christianposta.com/blog</a>.
+ */
+public class SqsConcurrentConsumerTest extends CamelTestSupport {
+    private static final int NUM_CONCURRENT = 10;
+    private static final int NUM_MESSAGES = 100;
+
+    final Set<Long> threadNumbers = new HashSet<Long>();
+
+    @Test
+    public void consumeMessagesFromQueue() throws Exception {
+        NotifyBuilder notifier = new 
NotifyBuilder(context).whenCompleted(NUM_MESSAGES).create();
+        assertTrue("We didn't process "
+                + NUM_MESSAGES
+                + " messages as we expected!", notifier.matches(5, 
TimeUnit.SECONDS));
+
+
+
+        // simple test to make sure all N concurrent consumers were used in 
the test
+        if (threadNumbers.size() != NUM_CONCURRENT) {
+            fail(String.format("We were expecting to have %d numbers of 
concurrent consumers, but only found %d",
+                    NUM_CONCURRENT, threadNumbers.size()));
+        }
+
+
+    }
+
+    @Override
+    protected JndiRegistry createRegistry() throws Exception {
+        JndiRegistry reg = super.createRegistry();
+        AmazonSQSClientMock client = new AmazonSQSClientMock();
+        createDummyMessages(client, NUM_MESSAGES);
+        reg.bind("client", client);
+        return reg;
+    }
+
+    private void createDummyMessages(AmazonSQSClientMock client, int 
numMessages) {
+        for (int counter = 0; counter < numMessages; counter++) {
+            Message message = new Message();
+            message.setBody("Message " + counter);
+            message.setMD5OfBody("6a1559560f67c5e7a7d5d838bf0272ee");
+            message.setMessageId("f6fb6f99-5eb2-4be4-9b15-144774141458");
+            message.setReceiptHandle("0NNAq8PwvXsyZkR6yu4nQ07FGxNmOBWi5");
+            client.messages.add(message);
+        }
+    }
+
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("aws-sqs://demo?concurrentConsumers=" + NUM_CONCURRENT + 
"&maxMessagesPerPoll=10&amazonSQSClient=#client")
+                        .process(new Processor() {
+                            @Override
+                            public void process(Exchange exchange) throws 
Exception {
+                                
threadNumbers.add(Thread.currentThread().getId());
+                            }
+                        }).log("processed a new message!");
+            }
+        };
+    }
+
+
+}

Reply via email to