CAMEL-7654 Support Message attribute for the SQSProducer
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2ca8187c Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2ca8187c Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2ca8187c Branch: refs/heads/master Commit: 2ca8187cf079b011b7f4438d803cd51c96378dc9 Parents: e7c9e40 Author: Willem Jiang <willem.ji...@gmail.com> Authored: Wed Aug 13 16:58:21 2014 +0800 Committer: Willem Jiang <willem.ji...@gmail.com> Committed: Wed Aug 13 17:49:50 2014 +0800 ---------------------------------------------------------------------- .../camel/component/aws/sqs/SqsEndpoint.java | 18 +++++ .../camel/component/aws/sqs/SqsProducer.java | 24 +++++++ .../component/aws/sqs/SqsProducerTest.java | 72 ++++++++++++++++++++ 3 files changed, 114 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/2ca8187c/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java index 6447d7b..b48fbe6 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java @@ -17,6 +17,7 @@ package org.apache.camel.component.aws.sqs; import java.util.HashMap; +import java.util.Map.Entry; import com.amazonaws.auth.AWSCredentials; import com.amazonaws.auth.BasicAWSCredentials; @@ -27,6 +28,7 @@ import com.amazonaws.services.sqs.model.CreateQueueResult; import com.amazonaws.services.sqs.model.GetQueueUrlRequest; import com.amazonaws.services.sqs.model.GetQueueUrlResult; import com.amazonaws.services.sqs.model.ListQueuesResult; +import com.amazonaws.services.sqs.model.MessageAttributeValue; import com.amazonaws.services.sqs.model.QueueAttributeName; import com.amazonaws.services.sqs.model.SetQueueAttributesRequest; @@ -188,6 +190,12 @@ public class SqsEndpoint extends ScheduledPollEndpoint { message.setHeader(SqsConstants.RECEIPT_HANDLE, msg.getReceiptHandle()); message.setHeader(SqsConstants.ATTRIBUTES, msg.getAttributes()); message.setHeader(SqsConstants.MESSAGE_ATTRIBUTES, msg.getMessageAttributes()); + + //add all sqs message attributes as camel message headers so that knowledge of + //the Sqs class MessageAttributeValue will not leak to the client + for (Entry<String, MessageAttributeValue> entry : msg.getMessageAttributes().entrySet()) { + message.setHeader(entry.getKey(), translateValue(entry.getValue())); + } return exchange; } @@ -231,4 +239,14 @@ public class SqsEndpoint extends ScheduledPollEndpoint { public void setMaxMessagesPerPoll(int maxMessagesPerPoll) { this.maxMessagesPerPoll = maxMessagesPerPoll; } + + private Object translateValue(MessageAttributeValue mav) { + Object result = null; + if (mav.getStringValue() != null) { + result = mav.getStringValue(); + } else if (mav.getBinaryValue() != null) { + result = mav.getBinaryValue(); + } + return result; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/2ca8187c/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java index b4fa72b..0a4c948 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java @@ -16,7 +16,13 @@ */ package org.apache.camel.component.aws.sqs; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; + import com.amazonaws.services.sqs.AmazonSQS; +import com.amazonaws.services.sqs.model.MessageAttributeValue; import com.amazonaws.services.sqs.model.SendMessageRequest; import com.amazonaws.services.sqs.model.SendMessageResult; @@ -44,6 +50,7 @@ public class SqsProducer extends DefaultProducer { public void process(Exchange exchange) throws Exception { String body = exchange.getIn().getBody(String.class); SendMessageRequest request = new SendMessageRequest(getQueueUrl(), body); + request.setMessageAttributes(translateAttributes(exchange.getIn().getHeaders())); addDelay(request, exchange); LOG.trace("Sending request [{}] from exchange [{}]...", request, exchange); @@ -98,4 +105,21 @@ public class SqsProducer extends DefaultProducer { public String toString() { return "SqsProducer[" + URISupport.sanitizeUri(getEndpoint().getEndpointUri()) + "]"; } + + private Map<String, MessageAttributeValue> translateAttributes(Map<String, Object> headers) { + Map<String, MessageAttributeValue> result = new HashMap<String, MessageAttributeValue>(); + for (Entry<String, Object> entry : headers.entrySet()) { + Object value = entry.getValue(); + MessageAttributeValue mav = new MessageAttributeValue(); + if (value instanceof String) { + mav.setDataType("String"); + mav.withStringValue((String)value); + } else if (value instanceof ByteBuffer) { + mav.setDataType("Binary"); + mav.withBinaryValue((ByteBuffer)value); + } + result.put(entry.getKey(), mav); + } + return result; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/2ca8187c/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsProducerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsProducerTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsProducerTest.java index efc4f0e..e6742ca 100644 --- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsProducerTest.java +++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsProducerTest.java @@ -16,6 +16,10 @@ */ package org.apache.camel.component.aws.sqs; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; + import com.amazonaws.services.sqs.AmazonSQSClient; import com.amazonaws.services.sqs.model.SendMessageRequest; import com.amazonaws.services.sqs.model.SendMessageResult; @@ -31,6 +35,7 @@ import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import static org.mockito.Matchers.any; import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.mock; @@ -43,6 +48,12 @@ public class SqsProducerTest { private static final String MESSAGE_MD5 = "00000000000000000000000000000000"; private static final String MESSAGE_ID = "11111111111111111111111111111111"; private static final String QUEUE_URL = "some://queue/url"; + private static final String SAMPLE_MESSAGE_HEADER_NAME_1 = "header_name_1"; + private static final String SAMPLE_MESSAGE_HEADER_VALUE_1 = "heder_value_1"; + private static final String SAMPLE_MESSAGE_HEADER_NAME_2 = "header_name_2"; + private static final ByteBuffer SAMPLE_MESSAGE_HEADER_VALUE_2 = ByteBuffer.wrap(new byte[10]); + private static final String SAMPLE_MESSAGE_HEADER_NAME_3 = "header_name_3"; + private static final String SAMPLE_MESSAGE_HEADER_VALUE_3 = "heder_value_3"; Exchange exchange = mock(Exchange.class, RETURNS_DEEP_STUBS); @@ -121,5 +132,66 @@ public class SqsProducerTest { underTest.process(exchange); verify(inMessage).setHeader(SqsConstants.MD5_OF_BODY, MESSAGE_MD5); } + + @Test + public void isAttributeMessageStringHeaderOnTheRequest() throws Exception { + Map<String, Object> headers = new HashMap<String, Object>(); + headers.put(SAMPLE_MESSAGE_HEADER_NAME_1, SAMPLE_MESSAGE_HEADER_VALUE_1); + when(inMessage.getHeaders()).thenReturn(headers); + underTest.process(exchange); + + ArgumentCaptor<SendMessageRequest> capture = ArgumentCaptor.forClass(SendMessageRequest.class); + verify(amazonSQSClient).sendMessage(capture.capture()); + + assertEquals(SAMPLE_MESSAGE_HEADER_VALUE_1, + capture.getValue().getMessageAttributes().get(SAMPLE_MESSAGE_HEADER_NAME_1) + .getStringValue()); + assertNull(capture.getValue().getMessageAttributes().get(SAMPLE_MESSAGE_HEADER_NAME_1) + .getBinaryValue()); + } + + @Test + public void isAttributeMessageByteBufferHeaderOnTheRequest() throws Exception { + Map<String, Object> headers = new HashMap<String, Object>(); + headers.put(SAMPLE_MESSAGE_HEADER_NAME_2, SAMPLE_MESSAGE_HEADER_VALUE_2); + when(inMessage.getHeaders()).thenReturn(headers); + underTest.process(exchange); + + ArgumentCaptor<SendMessageRequest> capture = ArgumentCaptor.forClass(SendMessageRequest.class); + verify(amazonSQSClient).sendMessage(capture.capture()); + + assertEquals(SAMPLE_MESSAGE_HEADER_VALUE_2, + capture.getValue().getMessageAttributes().get(SAMPLE_MESSAGE_HEADER_NAME_2) + .getBinaryValue()); + assertNull(capture.getValue().getMessageAttributes().get(SAMPLE_MESSAGE_HEADER_NAME_2) + .getStringValue()); + } + + @Test + public void isAllAttributeMessagesOnTheRequest() throws Exception { + Map<String, Object> headers = new HashMap<String, Object>(); + headers.put(SAMPLE_MESSAGE_HEADER_NAME_1, SAMPLE_MESSAGE_HEADER_VALUE_1); + headers.put(SAMPLE_MESSAGE_HEADER_NAME_2, SAMPLE_MESSAGE_HEADER_VALUE_2); + headers.put(SAMPLE_MESSAGE_HEADER_NAME_3, SAMPLE_MESSAGE_HEADER_VALUE_3); + when(inMessage.getHeaders()).thenReturn(headers); + underTest.process(exchange); + + ArgumentCaptor<SendMessageRequest> capture = ArgumentCaptor.forClass(SendMessageRequest.class); + verify(amazonSQSClient).sendMessage(capture.capture()); + + assertEquals(SAMPLE_MESSAGE_HEADER_VALUE_1, + capture.getValue().getMessageAttributes().get(SAMPLE_MESSAGE_HEADER_NAME_1) + .getStringValue()); + assertEquals(SAMPLE_MESSAGE_HEADER_VALUE_2, + capture.getValue().getMessageAttributes().get(SAMPLE_MESSAGE_HEADER_NAME_2) + .getBinaryValue()); + assertEquals(SAMPLE_MESSAGE_HEADER_VALUE_3, + capture.getValue().getMessageAttributes().get(SAMPLE_MESSAGE_HEADER_NAME_3) + .getStringValue()); + assertEquals(3, capture.getValue().getMessageAttributes().size()); + } + + + } \ No newline at end of file