CAMEL-7105 Added ability to auto reconnect for sqs queues with thanks to Adrian
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a5fd4298 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a5fd4298 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a5fd4298 Branch: refs/heads/camel-2.12.x Commit: a5fd429880e7d12444d01be6d22aeebdaa455418 Parents: 4293f06 Author: Willem Jiang <willem.ji...@gmail.com> Authored: Mon Jan 6 17:11:04 2014 +0800 Committer: Willem Jiang <willem.ji...@gmail.com> Committed: Mon Jan 6 17:25:02 2014 +0800 ---------------------------------------------------------------------- .../camel/component/aws/sqs/SqsConsumer.java | 28 +++++++++++++++++++- .../camel/component/aws/sqs/SqsEndpoint.java | 4 +-- 2 files changed, 29 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/a5fd4298/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java index 1163743..7daa8a8 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java @@ -29,6 +29,8 @@ import com.amazonaws.services.sqs.model.ChangeMessageVisibilityRequest; import com.amazonaws.services.sqs.model.DeleteMessageRequest; import com.amazonaws.services.sqs.model.Message; import com.amazonaws.services.sqs.model.MessageNotInflightException; +import com.amazonaws.services.sqs.model.QueueDeletedRecentlyException; +import com.amazonaws.services.sqs.model.QueueDoesNotExistException; import com.amazonaws.services.sqs.model.ReceiptHandleIsInvalidException; import com.amazonaws.services.sqs.model.ReceiveMessageRequest; import com.amazonaws.services.sqs.model.ReceiveMessageResult; @@ -46,6 +48,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; + /** * A Consumer of messages from the Amazon Web Service Simple Queue Service * <a href="http://aws.amazon.com/sqs/">AWS SQS</a> @@ -74,7 +77,14 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer { LOG.trace("Receiving messages with request [{}]...", request); - ReceiveMessageResult messageResult = getClient().receiveMessage(request); + ReceiveMessageResult messageResult = null; + try { + messageResult = getClient().receiveMessage(request); + } catch (QueueDoesNotExistException e) { + LOG.info("Queue does not exist....recreating now..."); + reConnectToQueue(); + messageResult = getClient().receiveMessage(request); + } if (LOG.isTraceEnabled()) { LOG.trace("Received {} messages", messageResult.getMessages().size()); @@ -83,6 +93,22 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer { Queue<Exchange> exchanges = createExchanges(messageResult.getMessages()); return processBatch(CastUtils.cast(exchanges)); } + + public void reConnectToQueue() { + try { + getEndpoint().createQueue(getClient()); + } catch (QueueDeletedRecentlyException qdr) { + LOG.debug("Queue recently deleted, will retry in 30 seconds."); + try { + Thread.sleep(30000); + getEndpoint().createQueue(getClient()); + } catch (Exception e) { + LOG.error("failed to retry queue connection.", e); + } + } catch (Exception e) { + LOG.error("Could not connect to queue in amazon.", e); + } + } protected Queue<Exchange> createExchanges(List<Message> messages) { if (LOG.isTraceEnabled()) { http://git-wip-us.apache.org/repos/asf/camel/blob/a5fd4298/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java index ca3ff0a..aa01c72 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java @@ -110,7 +110,7 @@ public class SqsEndpoint extends ScheduledPollEndpoint { } } - private void createQueue(AmazonSQS client) { + protected void createQueue(AmazonSQS client) { LOG.trace("Queue '{}' doesn't exist. Will create it...", configuration.getQueueName()); // creates a new queue, or returns the URL of an existing one @@ -220,7 +220,7 @@ public class SqsEndpoint extends ScheduledPollEndpoint { protected String getQueueUrl() { return queueUrl; } - + public int getMaxMessagesPerPoll() { return maxMessagesPerPoll; }