Repository: camel Updated Branches: refs/heads/master 64d08a7e3 -> b80021a15
Fix for https://issues.apache.org/jira/browse/CAMEL-5113 Parallel and fault tolerant message processing for SQS endpoints Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/b80021a1 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/b80021a1 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/b80021a1 Branch: refs/heads/master Commit: b80021a1551213e155c4ec8b1464831e9a6ab1d3 Parents: 64d08a7 Author: Christian Posta <christian.po...@gmail.com> Authored: Thu Jan 8 16:31:38 2015 -0700 Committer: Christian Posta <christian.po...@gmail.com> Committed: Thu Jan 8 16:31:38 2015 -0700 ---------------------------------------------------------------------- .../DefaultScheduledPollConsumerScheduler.java | 47 ++++++--- .../component/aws/sqs/SqsConfiguration.java | 10 ++ .../camel/component/aws/sqs/SqsEndpoint.java | 4 + .../aws/sqs/SqsConcurrentConsumerTest.java | 100 +++++++++++++++++++ 4 files changed, 148 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/b80021a1/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumerScheduler.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumerScheduler.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumerScheduler.java index 729ee75..db4e4d1 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumerScheduler.java +++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumerScheduler.java @@ -16,6 +16,8 @@ */ package org.apache.camel.impl; +import java.util.ArrayList; +import java.util.List; import java.util.Locale; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -38,8 +40,9 @@ public class DefaultScheduledPollConsumerScheduler extends org.apache.camel.supp private Consumer consumer; private ScheduledExecutorService scheduledExecutorService; private boolean shutdownExecutor; - private volatile ScheduledFuture<?> future; + private volatile List<ScheduledFuture<?>> futures = new ArrayList<ScheduledFuture<?>>(); private Runnable task; + private int concurrentTasks = 1; private long initialDelay = 1000; private long delay = 500; @@ -94,6 +97,14 @@ public class DefaultScheduledPollConsumerScheduler extends org.apache.camel.supp this.scheduledExecutorService = scheduledExecutorService; } + public int getConcurrentTasks() { + return concurrentTasks; + } + + public void setConcurrentTasks(int concurrentTasks) { + this.concurrentTasks = concurrentTasks; + } + @Override public void onInit(Consumer consumer) { this.consumer = consumer; @@ -106,34 +117,41 @@ public class DefaultScheduledPollConsumerScheduler extends org.apache.camel.supp @Override public void unscheduleTask() { - if (future != null) { - future.cancel(false); + if (isSchedulerStarted()) { + for (ScheduledFuture<?> future : futures) { + future.cancel(true); + } + futures.clear(); } } @Override public void startScheduler() { // only schedule task if we have not already done that - if (future == null) { + if (futures.size() == 0) { if (isUseFixedDelay()) { if (LOG.isDebugEnabled()) { LOG.debug("Scheduling poll (fixed delay) with initialDelay: {}, delay: {} ({}) for: {}", new Object[]{getInitialDelay(), getDelay(), getTimeUnit().name().toLowerCase(Locale.ENGLISH), consumer.getEndpoint()}); } - future = scheduledExecutorService.scheduleWithFixedDelay(task, getInitialDelay(), getDelay(), getTimeUnit()); + for (int i = 0; i < concurrentTasks; i++) { + futures.add(scheduledExecutorService.scheduleWithFixedDelay(task, getInitialDelay(), getDelay(), getTimeUnit())); + } } else { if (LOG.isDebugEnabled()) { LOG.debug("Scheduling poll (fixed rate) with initialDelay: {}, delay: {} ({}) for: {}", new Object[]{getInitialDelay(), getDelay(), getTimeUnit().name().toLowerCase(Locale.ENGLISH), consumer.getEndpoint()}); } - future = scheduledExecutorService.scheduleAtFixedRate(task, getInitialDelay(), getDelay(), getTimeUnit()); + for (int i = 0; i < concurrentTasks; i++) { + futures.add(scheduledExecutorService.scheduleAtFixedRate(task, getInitialDelay(), getDelay(), getTimeUnit())); + } } } } @Override public boolean isSchedulerStarted() { - return future != null; + return futures != null && futures.size() > 0; } @Override @@ -146,7 +164,7 @@ public class DefaultScheduledPollConsumerScheduler extends org.apache.camel.supp if (scheduledExecutorService == null) { // we only need one thread in the pool to schedule this task this.scheduledExecutorService = getCamelContext().getExecutorServiceManager() - .newSingleThreadScheduledExecutor(consumer, consumer.getEndpoint().getEndpointUri()); + .newScheduledThreadPool(consumer, consumer.getEndpoint().getEndpointUri(), concurrentTasks); // and we should shutdown the thread pool when no longer needed this.shutdownExecutor = true; } @@ -154,17 +172,20 @@ public class DefaultScheduledPollConsumerScheduler extends org.apache.camel.supp @Override protected void doStop() throws Exception { - if (future != null) { - LOG.debug("This consumer is stopping, so cancelling scheduled task: " + future); - future.cancel(true); - future = null; + if (isSchedulerStarted()) { + LOG.debug("This consumer is stopping, so cancelling scheduled task: " + futures); + for (ScheduledFuture<?> future : futures) { + future.cancel(true); + } + futures.clear(); } if (shutdownExecutor && scheduledExecutorService != null) { getCamelContext().getExecutorServiceManager().shutdownNow(scheduledExecutorService); scheduledExecutorService = null; - future = null; + futures.clear(); } } + } http://git-wip-us.apache.org/repos/asf/camel/blob/b80021a1/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java index 07a9ff2..da6f3b4 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java @@ -58,6 +58,7 @@ public class SqsConfiguration { private Integer defaultVisibilityTimeout; @UriParam(defaultValue = "false") private Boolean extendMessageVisibility = Boolean.FALSE; + private Integer concurrentConsumers = 1; // producer properties @UriParam @@ -245,6 +246,14 @@ public class SqsConfiguration { this.region = region; } + public Integer getConcurrentConsumers() { + return concurrentConsumers; + } + + public void setConcurrentConsumers(Integer concurrentConsumers) { + this.concurrentConsumers = concurrentConsumers; + } + @Override public String toString() { return "SqsConfiguration[queueName=" + queueName @@ -266,6 +275,7 @@ public class SqsConfiguration { + ", redrivePolicy=" + redrivePolicy + ", extendMessageVisibility=" + extendMessageVisibility + ", queueOwnerAWSAccountId=" + queueOwnerAWSAccountId + + ", concurrentConsumers=" + concurrentConsumers + ", region=" + region + "]"; } http://git-wip-us.apache.org/repos/asf/camel/blob/b80021a1/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java index d2cc213..4845dd0 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java @@ -38,6 +38,7 @@ import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.Producer; import org.apache.camel.impl.DefaultExchange; +import org.apache.camel.impl.DefaultScheduledPollConsumerScheduler; import org.apache.camel.impl.ScheduledPollEndpoint; import org.apache.camel.spi.HeaderFilterStrategy; import org.apache.camel.spi.HeaderFilterStrategyAware; @@ -89,6 +90,9 @@ public class SqsEndpoint extends ScheduledPollEndpoint implements HeaderFilterSt SqsConsumer sqsConsumer = new SqsConsumer(this, processor); configureConsumer(sqsConsumer); sqsConsumer.setMaxMessagesPerPoll(maxMessagesPerPoll); + DefaultScheduledPollConsumerScheduler scheduler = new DefaultScheduledPollConsumerScheduler(); + scheduler.setConcurrentTasks(configuration.getConcurrentConsumers()); + sqsConsumer.setScheduler(scheduler); return sqsConsumer; } http://git-wip-us.apache.org/repos/asf/camel/blob/b80021a1/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsConcurrentConsumerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsConcurrentConsumerTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsConcurrentConsumerTest.java new file mode 100644 index 0000000..9de5627 --- /dev/null +++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsConcurrentConsumerTest.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.aws.sqs; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import com.amazonaws.services.sqs.model.Message; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.NotifyBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.impl.JndiRegistry; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + + + +/** + * Created by ceposta + * <a href="http://christianposta.com/blog>http://christianposta.com/blog</a>. + */ +public class SqsConcurrentConsumerTest extends CamelTestSupport { + private static final int NUM_CONCURRENT = 10; + private static final int NUM_MESSAGES = 100; + + final Set<Long> threadNumbers = new HashSet<Long>(); + + @Test + public void consumeMessagesFromQueue() throws Exception { + NotifyBuilder notifier = new NotifyBuilder(context).whenCompleted(NUM_MESSAGES).create(); + assertTrue("We didn't process " + + NUM_MESSAGES + + " messages as we expected!", notifier.matches(5, TimeUnit.SECONDS)); + + + + // simple test to make sure all N concurrent consumers were used in the test + if (threadNumbers.size() != NUM_CONCURRENT) { + fail(String.format("We were expecting to have %d numbers of concurrent consumers, but only found %d", + NUM_CONCURRENT, threadNumbers.size())); + } + + + } + + @Override + protected JndiRegistry createRegistry() throws Exception { + JndiRegistry reg = super.createRegistry(); + AmazonSQSClientMock client = new AmazonSQSClientMock(); + createDummyMessages(client, NUM_MESSAGES); + reg.bind("client", client); + return reg; + } + + private void createDummyMessages(AmazonSQSClientMock client, int numMessages) { + for (int counter = 0; counter < numMessages; counter++) { + Message message = new Message(); + message.setBody("Message " + counter); + message.setMD5OfBody("6a1559560f67c5e7a7d5d838bf0272ee"); + message.setMessageId("f6fb6f99-5eb2-4be4-9b15-144774141458"); + message.setReceiptHandle("0NNAq8PwvXsyZkR6yu4nQ07FGxNmOBWi5"); + client.messages.add(message); + } + } + + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("aws-sqs://demo?concurrentConsumers=" + NUM_CONCURRENT + "&maxMessagesPerPoll=10&amazonSQSClient=#client") + .process(new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + threadNumbers.add(Thread.currentThread().getId()); + } + }).log("processed a new message!"); + } + }; + } + + +}