CAMEL-10953 add message attribute support to sns producer
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/692ecaad Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/692ecaad Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/692ecaad Branch: refs/heads/master Commit: 692ecaad6e44fa4528a09464c6c230299d13cbb8 Parents: cace0ee Author: Peter van Gestel <peter.van.ges...@osudio.com> Authored: Mon Mar 6 19:23:33 2017 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Tue Mar 7 17:18:43 2017 +0100 ---------------------------------------------------------------------- .../camel/component/aws/sns/SnsEndpoint.java | 26 ++++++++--- .../aws/sns/SnsHeaderFilterStrategy.java | 30 +++++++++++++ .../camel/component/aws/sns/SnsProducer.java | 45 ++++++++++++++++++-- 3 files changed, 92 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/692ecaad/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsEndpoint.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsEndpoint.java index ac7384b..b9276da 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsEndpoint.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsEndpoint.java @@ -33,11 +33,9 @@ import org.apache.camel.Component; import org.apache.camel.Consumer; import org.apache.camel.Processor; import org.apache.camel.Producer; +import org.apache.camel.component.aws.sqs.SqsHeaderFilterStrategy; import org.apache.camel.impl.DefaultEndpoint; -import org.apache.camel.spi.Metadata; -import org.apache.camel.spi.UriEndpoint; -import org.apache.camel.spi.UriParam; -import org.apache.camel.spi.UriPath; +import org.apache.camel.spi.*; import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,7 +45,7 @@ import org.slf4j.LoggerFactory; */ @UriEndpoint(firstVersion = "2.8.0", scheme = "aws-sns", title = "AWS Simple Notification System", syntax = "aws-sns:topicNameOrArn", producerOnly = true, label = "cloud,mobile,messaging") -public class SnsEndpoint extends DefaultEndpoint { +public class SnsEndpoint extends DefaultEndpoint implements HeaderFilterStrategyAware { private static final Logger LOG = LoggerFactory.getLogger(SnsEndpoint.class); @@ -58,6 +56,8 @@ public class SnsEndpoint extends DefaultEndpoint { private String topicNameOrArn; // to support component docs @UriParam private SnsConfiguration configuration; + @UriParam + private HeaderFilterStrategy headerFilterStrategy; @Deprecated public SnsEndpoint(String uri, CamelContext context, SnsConfiguration configuration) { @@ -69,6 +69,17 @@ public class SnsEndpoint extends DefaultEndpoint { this.configuration = configuration; } + public HeaderFilterStrategy getHeaderFilterStrategy() { + return headerFilterStrategy; + } + + /** + * To use a custom HeaderFilterStrategy to map headers to/from Camel. + */ + public void setHeaderFilterStrategy(HeaderFilterStrategy strategy) { + this.headerFilterStrategy = strategy; + } + public Consumer createConsumer(Processor processor) throws Exception { throw new UnsupportedOperationException("You cannot receive messages from this endpoint"); } @@ -92,6 +103,11 @@ public class SnsEndpoint extends DefaultEndpoint { LOG.trace("Updating the SNS region with : {} " + configuration.getAmazonSNSEndpoint()); snsClient.setEndpoint(configuration.getAmazonSNSEndpoint()); } + + // check the setting the headerFilterStrategy + if (headerFilterStrategy == null) { + headerFilterStrategy = new SqsHeaderFilterStrategy(); + } if (configuration.getTopicArn() == null) { try { http://git-wip-us.apache.org/repos/asf/camel/blob/692ecaad/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsHeaderFilterStrategy.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsHeaderFilterStrategy.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsHeaderFilterStrategy.java new file mode 100644 index 0000000..fb51835 --- /dev/null +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsHeaderFilterStrategy.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.aws.sns; + +import org.apache.camel.impl.DefaultHeaderFilterStrategy; + +public class SnsHeaderFilterStrategy extends DefaultHeaderFilterStrategy { + public SnsHeaderFilterStrategy() { + initialize(); + } + + protected void initialize() { + // filter headers begin with "Camel" or "org.apache.camel" + setOutFilterPattern("(Camel|org\\.apache\\.camel)[\\.|a-z|A-z|0-9]*"); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/692ecaad/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsProducer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsProducer.java index 5781155..69c11f7 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsProducer.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsProducer.java @@ -16,17 +16,23 @@ */ package org.apache.camel.component.aws.sns; -import com.amazonaws.services.sns.model.PublishRequest; -import com.amazonaws.services.sns.model.PublishResult; +import com.amazonaws.services.sns.model.*; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.impl.DefaultProducer; import org.apache.camel.util.URISupport; +import org.apache.camel.spi.HeaderFilterStrategy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; + import static org.apache.camel.component.aws.common.AwsExchangeUtil.getMessageForResponse; @@ -51,7 +57,8 @@ public class SnsProducer extends DefaultProducer { request.setSubject(determineSubject(exchange)); request.setMessageStructure(determineMessageStructure(exchange)); request.setMessage(exchange.getIn().getBody(String.class)); - + request.setMessageAttributes(this.translateAttributes(exchange.getIn().getHeaders(), exchange)); + LOG.trace("Sending request [{}] from exchange [{}]...", request, exchange); PublishResult result = getEndpoint().getSNSClient().publish(request); @@ -79,7 +86,35 @@ public class SnsProducer extends DefaultProducer { return structure; } - + private Map<String, MessageAttributeValue> translateAttributes(Map<String, Object> headers, Exchange exchange) { + HashMap result = new HashMap(); + HeaderFilterStrategy headerFilterStrategy = this.getEndpoint().getHeaderFilterStrategy(); + Iterator var5 = headers.entrySet().iterator(); + + while(var5.hasNext()) { + Entry entry = (Entry)var5.next(); + if(!headerFilterStrategy.applyFilterToCamelHeaders((String)entry.getKey(), entry.getValue(), exchange)) { + Object value = entry.getValue(); + MessageAttributeValue mav; + if(value instanceof String) { + mav = new MessageAttributeValue(); + mav.setDataType("String"); + mav.withStringValue((String)value); + result.put(entry.getKey(), mav); + } else if(value instanceof ByteBuffer) { + mav = new MessageAttributeValue(); + mav.setDataType("Binary"); + mav.withBinaryValue((ByteBuffer)value); + result.put(entry.getKey(), mav); + } else { + LOG.warn("Cannot put the message header key={}, value={} into Sqs MessageAttribute", entry.getKey(), entry.getValue()); + } + } + } + + return result; + } + protected SnsConfiguration getConfiguration() { return getEndpoint().getConfiguration(); } @@ -96,4 +131,6 @@ public class SnsProducer extends DefaultProducer { public SnsEndpoint getEndpoint() { return (SnsEndpoint) super.getEndpoint(); } + + } \ No newline at end of file