Author: ningjiang Date: Mon Feb 25 11:22:02 2013 New Revision: 1449660 URL: http://svn.apache.org/r1449660 Log: CAMEL-6096 Add support to set the delays through message header with thanks to Andrew
Added: camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsProducerTest.java Modified: camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConstants.java camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java Modified: camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConstants.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConstants.java?rev=1449660&r1=1449659&r2=1449660&view=diff ============================================================================== --- camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConstants.java (original) +++ camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConstants.java Mon Feb 25 11:22:02 2013 @@ -26,4 +26,5 @@ public interface SqsConstants { String MD5_OF_BODY = "CamelAwsSqsMD5OfBody"; String MESSAGE_ID = "CamelAwsSqsMessageId"; String RECEIPT_HANDLE = "CamelAwsSqsReceiptHandle"; + String DELAY_HEADER = "CamelAwsSqsDelaySeconds"; } \ No newline at end of file Modified: camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java?rev=1449660&r1=1449659&r2=1449660&view=diff ============================================================================== --- camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java (original) +++ camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java Mon Feb 25 11:22:02 2013 @@ -44,7 +44,7 @@ public class SqsProducer extends Default public void process(Exchange exchange) throws Exception { String body = exchange.getIn().getBody(String.class); SendMessageRequest request = new SendMessageRequest(getQueueUrl(), body); - request.setDelaySeconds(getEndpoint().getConfiguration().getDelaySeconds()); + addDelay(request, exchange); LOG.trace("Sending request [{}] from exchange [{}]...", request, exchange); @@ -57,6 +57,20 @@ public class SqsProducer extends Default message.setHeader(SqsConstants.MD5_OF_BODY, result.getMD5OfMessageBody()); } + private void addDelay(SendMessageRequest request, Exchange exchange) { + Integer headerValue = exchange.getIn().getHeader(SqsConstants.DELAY_HEADER, Integer.class); + Integer delayValue = Integer.valueOf(0); + if (headerValue == null) { + LOG.trace("Using the config delay"); + delayValue = getEndpoint().getConfiguration().getDelaySeconds(); + } else { + LOG.trace("Using the header delay"); + delayValue = headerValue; + } + LOG.trace("found delay: " + delayValue); + request.setDelaySeconds(delayValue == null ? Integer.valueOf(0) : delayValue); + } + private Message getMessageForResponse(Exchange exchange) { if (exchange.getPattern().isOutCapable()) { Message out = exchange.getOut(); Added: camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsProducerTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsProducerTest.java?rev=1449660&view=auto ============================================================================== --- camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsProducerTest.java (added) +++ camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsProducerTest.java Mon Feb 25 11:22:02 2013 @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.aws.sqs; + +import com.amazonaws.services.sqs.AmazonSQSClient; +import com.amazonaws.services.sqs.model.SendMessageRequest; +import com.amazonaws.services.sqs.model.SendMessageResult; + +import org.apache.camel.Exchange; +import org.apache.camel.ExchangePattern; +import org.apache.camel.Message; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.RETURNS_DEEP_STUBS; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class SqsProducerTest { + private static final String SAMPLE_MESSAGE_BODY = "this is a body"; + private static final String MESSAGE_MD5 = "00000000000000000000000000000000"; + private static final String MESSAGE_ID = "11111111111111111111111111111111"; + private static final String QUEUE_URL = "some://queue/url"; + + Exchange exchange = mock(Exchange.class, RETURNS_DEEP_STUBS); + + @Mock private SqsEndpoint sqsEndpoint; + @Mock private AmazonSQSClient amazonSQSClient; + @Mock private Message outMessage; + @Mock private Message inMessage; + + private SendMessageResult sendMessageResult; + private SqsConfiguration sqsConfiguration; + + private SqsProducer underTest; + + @Before + public void setup() throws Exception { + underTest = new SqsProducer(sqsEndpoint); + sendMessageResult = new SendMessageResult().withMD5OfMessageBody(MESSAGE_MD5).withMessageId(MESSAGE_ID); + sqsConfiguration = new SqsConfiguration(); + sqsConfiguration.setDelaySeconds(Integer.valueOf(0)); + when(sqsEndpoint.getClient()).thenReturn(amazonSQSClient); + when(sqsEndpoint.getConfiguration()).thenReturn(sqsConfiguration); + when(amazonSQSClient.sendMessage(any(SendMessageRequest.class))).thenReturn(sendMessageResult); + when(exchange.getOut()).thenReturn(outMessage); + when(exchange.getIn()).thenReturn(inMessage); + when(exchange.getPattern()).thenReturn(ExchangePattern.InOnly); + when(inMessage.getBody(String.class)).thenReturn(SAMPLE_MESSAGE_BODY); + when(sqsEndpoint.getQueueUrl()).thenReturn(QUEUE_URL); + } + + @Test + public void itSendsTheBodyFromAnExchange() throws Exception { + underTest.process(exchange); + + ArgumentCaptor<SendMessageRequest> capture = ArgumentCaptor.forClass(SendMessageRequest.class); + verify(amazonSQSClient).sendMessage(capture.capture()); + assertEquals(SAMPLE_MESSAGE_BODY, capture.getValue().getMessageBody()); + } + + @Test + public void itSendsTheCorrectQueueUrl() throws Exception { + underTest.process(exchange); + + ArgumentCaptor<SendMessageRequest> capture = ArgumentCaptor.forClass(SendMessageRequest.class); + verify(amazonSQSClient).sendMessage(capture.capture()); + assertEquals(QUEUE_URL, capture.getValue().getQueueUrl()); + } + + @Test + public void itSetsTheDelayFromTheConfigurationOnTheRequest() throws Exception { + sqsConfiguration.setDelaySeconds(Integer.valueOf(9001)); + underTest.process(exchange); + + ArgumentCaptor<SendMessageRequest> capture = ArgumentCaptor.forClass(SendMessageRequest.class); + verify(amazonSQSClient).sendMessage(capture.capture()); + assertEquals(9001, capture.getValue().getDelaySeconds().intValue()); + } + + @Test + public void itSetsTheDelayFromMessageHeaderOnTheRequest() throws Exception { + when(inMessage.getHeader(SqsConstants.DELAY_HEADER, Integer.class)).thenReturn(Integer.valueOf(2000)); + underTest.process(exchange); + + ArgumentCaptor<SendMessageRequest> capture = ArgumentCaptor.forClass(SendMessageRequest.class); + verify(amazonSQSClient).sendMessage(capture.capture()); + assertEquals(2000, capture.getValue().getDelaySeconds().intValue()); + } + + @Test + public void itSetsTheMessageIdOnTheExchangeMessage() throws Exception { + underTest.process(exchange); + verify(inMessage).setHeader(SqsConstants.MESSAGE_ID, MESSAGE_ID); + } + + @Test + public void itSetsTheMd5SumOnTheExchangeMessage() throws Exception { + underTest.process(exchange); + verify(inMessage).setHeader(SqsConstants.MD5_OF_BODY, MESSAGE_MD5); + } + +} \ No newline at end of file