Updated Branches: refs/heads/camel-2.12.x ddc6b43c1 -> 8257f46be refs/heads/master e76d870de -> a562c867e
CAMEL-6850: Allow AWS SQS to not ack or even if it doesn't encounter an exception. Thanks to Christian Posta for the patch. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a562c867 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a562c867 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a562c867 Branch: refs/heads/master Commit: a562c867ed2794fc1934709a3777dfeed69aff8f Parents: e76d870 Author: Claus Ibsen <davscl...@apache.org> Authored: Fri Oct 11 12:32:41 2013 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Fri Oct 11 12:32:41 2013 +0200 ---------------------------------------------------------------------- .../component/aws/sqs/SqsConfiguration.java | 10 ++ .../camel/component/aws/sqs/SqsConsumer.java | 19 ++- .../component/aws/sqs/AmazonSQSClientMock.java | 67 +++++++++- .../sqs/SqsFilterMessagesWithNoDeleteTest.java | 127 +++++++++++++++++++ 4 files changed, 214 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/a562c867/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java index e306221..87f102c 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java @@ -36,6 +36,7 @@ public class SqsConfiguration { // consumer properties private Boolean deleteAfterRead = Boolean.TRUE; + private Boolean deleteIfFiltered = Boolean.TRUE; private Integer visibilityTimeout; private Collection<String> attributeNames; private Integer waitTimeSeconds; @@ -187,6 +188,14 @@ public class SqsConfiguration { this.queueOwnerAWSAccountId = queueOwnerAWSAccountId; } + public Boolean isDeleteIfFiltered() { + return deleteIfFiltered; + } + + public void setDeleteIfFiltered(Boolean deleteIfFiltered) { + this.deleteIfFiltered = deleteIfFiltered; + } + @Override public String toString() { return "SqsConfiguration[queueName=" + queueName @@ -194,6 +203,7 @@ public class SqsConfiguration { + ", accessKey=" + accessKey + ", secretKey=xxxxxxxxxxxxxxx" + ", deleteAfterRead=" + deleteAfterRead + + ", deleteIfFiltered=" + deleteIfFiltered + ", visibilityTimeout=" + visibilityTimeout + ", attributeNames=" + attributeNames + ", waitTimeSeconds=" + waitTimeSeconds http://git-wip-us.apache.org/repos/asf/camel/blob/a562c867/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java index 4fae7d4..4c76c15 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java @@ -180,12 +180,13 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer { */ protected void processCommit(Exchange exchange) { try { - if (getConfiguration().isDeleteAfterRead()) { + + if (shouldDelete(exchange)) { String receiptHandle = exchange.getIn().getHeader(SqsConstants.RECEIPT_HANDLE, String.class); DeleteMessageRequest deleteRequest = new DeleteMessageRequest(getQueueUrl(), receiptHandle); - + LOG.trace("Deleting message with receipt handle {}...", receiptHandle); - + getClient().deleteMessage(deleteRequest); LOG.trace("Deleted message with receipt handle {}...", receiptHandle); @@ -195,6 +196,18 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer { } } + private boolean shouldDelete(Exchange exchange) { + return getConfiguration().isDeleteAfterRead() + && (getConfiguration().isDeleteIfFiltered() + || (!getConfiguration().isDeleteIfFiltered() + && passedThroughFilter(exchange))); + } + + private boolean passedThroughFilter(Exchange exchange) { + return exchange.getProperties().containsKey(Exchange.FILTER_MATCHED) + && ((Boolean) exchange.getProperties().get(Exchange.FILTER_MATCHED)); + } + /** * Strategy when processing the exchange failed. * http://git-wip-us.apache.org/repos/asf/camel/blob/a562c867/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/AmazonSQSClientMock.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/AmazonSQSClientMock.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/AmazonSQSClientMock.java index ac0838f..4d227a7 100644 --- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/AmazonSQSClientMock.java +++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/AmazonSQSClientMock.java @@ -20,9 +20,14 @@ import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import com.amazonaws.AmazonClientException; import com.amazonaws.AmazonServiceException; @@ -41,11 +46,14 @@ import com.amazonaws.services.sqs.model.SendMessageResult; import com.amazonaws.services.sqs.model.SetQueueAttributesRequest; public class AmazonSQSClientMock extends AmazonSQSClient { - + List<Message> messages = new ArrayList<Message>(); Map<String, Map<String, String>> queueAttributes = new HashMap<String, Map<String, String>>(); List<ChangeMessageVisibilityRequest> changeMessageVisibilityRequests = new CopyOnWriteArrayList<ChangeMessageVisibilityRequest>(); - + private Map<String, CreateQueueRequest> queues = new LinkedHashMap<String, CreateQueueRequest>(); + private Map<String, ScheduledFuture> inFlight = new LinkedHashMap<String, ScheduledFuture>(); + private ScheduledExecutorService scheduler; + public AmazonSQSClientMock() { super(new BasicAWSCredentials("myAccessKey", "mySecretKey")); } @@ -58,8 +66,10 @@ public class AmazonSQSClientMock extends AmazonSQSClient { @Override public CreateQueueResult createQueue(CreateQueueRequest createQueueRequest) throws AmazonServiceException, AmazonClientException { + String queueName = "https://queue.amazonaws.com/541925086079/" + createQueueRequest.getQueueName(); + queues.put(queueName, createQueueRequest); CreateQueueResult result = new CreateQueueResult(); - result.setQueueUrl("https://queue.amazonaws.com/541925086079/MyQueue"); + result.setQueueUrl(queueName); return result; } @@ -91,8 +101,10 @@ public class AmazonSQSClientMock extends AmazonSQSClient { synchronized (messages) { int fetchSize = 0; for (Iterator<Message> iterator = messages.iterator(); iterator.hasNext() && fetchSize < maxNumberOfMessages; fetchSize++) { - resultMessages.add(iterator.next()); + Message rc = iterator.next(); + resultMessages.add(rc); iterator.remove(); + scheduleCancelInflight(receiveMessageRequest.getQueueUrl(), rc); } } @@ -100,9 +112,52 @@ public class AmazonSQSClientMock extends AmazonSQSClient { return result; } + /* + * Cancel (put back onto queue) in flight messages if the visibility time has expired + * and has not been manually deleted (ack'd) + */ + private void scheduleCancelInflight(final String queueUrl, final Message message) { + if (scheduler != null) { + int visibility = getVisibilityForQueue(queueUrl); + if (visibility > 0) { + ScheduledFuture task = scheduler.schedule(new Runnable() { + @Override + public void run() { + synchronized (messages) { + // put it back! + messages.add(message); + } + } + }, visibility, TimeUnit.SECONDS); + + inFlight.put(message.getReceiptHandle(), task); + } + } + } + + private int getVisibilityForQueue(String queueUrl) { + Map<String, String> queueAttr = queues.get(queueUrl).getAttributes(); + if (queueAttr.containsKey("VisibilityTimeout")) { + return Integer.parseInt(queueAttr.get("VisibilityTimeout")); + } + return 0; + } + + public ScheduledExecutorService getScheduler() { + return scheduler; + } + + public void setScheduler(ScheduledExecutorService scheduler) { + this.scheduler = scheduler; + } + @Override - public void deleteMessage(DeleteMessageRequest deleteMessageRequest) throws AmazonServiceException, AmazonClientException { - // noop + public void deleteMessage(DeleteMessageRequest deleteMessageRequest) throws AmazonClientException { + String receiptHandle = deleteMessageRequest.getReceiptHandle(); + if (inFlight.containsKey(receiptHandle)) { + ScheduledFuture inFlightTask = inFlight.get(receiptHandle); + inFlightTask.cancel(true); + } } @Override http://git-wip-us.apache.org/repos/asf/camel/blob/a562c867/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsFilterMessagesWithNoDeleteTest.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsFilterMessagesWithNoDeleteTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsFilterMessagesWithNoDeleteTest.java new file mode 100644 index 0000000..2c72d7e --- /dev/null +++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsFilterMessagesWithNoDeleteTest.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.aws.sqs; + +import java.util.concurrent.TimeUnit; + +import com.amazonaws.services.sqs.model.Message; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.impl.JndiRegistry; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + + +public class SqsFilterMessagesWithNoDeleteTest extends CamelTestSupport { + + + // put some test messages onto the 'queue' + private void populateMessages(AmazonSQSClientMock clientMock) { + Message message = new Message(); + message.setBody("Message: hello, world!"); + message.setMD5OfBody("6a1559560f67c5e7a7d5d838bf0272ee"); + message.setMessageId("f6fb6f99-5eb2-4be4-9b15-144774141458"); + message.setReceiptHandle("0NNAq8PwvXsyZkR6yu4nQ07FGxNmOBWi5"); + + clientMock.messages.add(message); + } + + @Test + public void testDoesNotGetThroughFilter() throws Exception { + + final String sqsURI = String.format("aws-sqs://MyQueue?amazonSQSClient=#amazonSQSClient" + // note we will NOT delete if this message gets filtered out + + "&deleteIfFiltered=false" + + "&defaultVisibilityTimeout=1"); + + AmazonSQSClientMock clientMock = new AmazonSQSClientMock(); + populateMessages(clientMock); + JndiRegistry registry = new JndiRegistry(createJndiContext()); + + DefaultCamelContext ctx = new DefaultCamelContext(registry); + ctx.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from(sqsURI) + // try to filter using a non-existent header... should not go through + .filter(simple("${header.login} == true")) + .to("mock:result"); + + } + }); + MockEndpoint result = MockEndpoint.resolve(ctx, "mock:result"); + clientMock.setScheduler(ctx.getExecutorServiceManager().newScheduledThreadPool(clientMock, "ClientMock Scheduler", 1)); + registry.bind("amazonSQSClient", clientMock); + + result.expectedMessageCount(0); + + ctx.start(); + + // we shouldn't get + result.assertIsSatisfied(1000, TimeUnit.MILLISECONDS); + + + // however, the message should not be deleted, that is, it should be left on the queue + String response = ctx.createConsumerTemplate().receiveBody(sqsURI, 5000, String.class); + + assertEquals(response, "Message: hello, world!"); + + } + + @Test + public void testGetThroughFilter() throws Exception { + final String sqsURI = String.format("aws-sqs://MyQueue?amazonSQSClient=#amazonSQSClient" + // note we will NOT delete if this message gets filtered out, but if it goes + // through filter, it should be deleted! + + "&deleteIfFiltered=false" + + "&defaultVisibilityTimeout=1"); + + AmazonSQSClientMock clientMock = new AmazonSQSClientMock(); + populateMessages(clientMock); + JndiRegistry registry = new JndiRegistry(createJndiContext()); + + DefaultCamelContext ctx = new DefaultCamelContext(registry); + ctx.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from(sqsURI) + .setHeader("login", constant(true)) + + // this filter should allow the message to pass.. + .filter(simple("${header.login} == true")) + .to("mock:result"); + + } + }); + MockEndpoint result = MockEndpoint.resolve(ctx, "mock:result"); + registry.bind("amazonSQSClient", clientMock); + clientMock.setScheduler(ctx.getExecutorServiceManager().newScheduledThreadPool(clientMock, "ClientMock Scheduler", 1)); + + result.expectedMessageCount(1); + ctx.start(); + + // the message should get through filter and mock should assert this + result.assertIsSatisfied(1000, TimeUnit.MILLISECONDS); + + // however, the message should not be deleted, that is, it should be left on the queue + String response = ctx.createConsumerTemplate().receiveBody(sqsURI, 5000, String.class); + + assertNull(response); + } + +}