This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 86edc1fa045f954d47e60dd0c17b5d0157afe99e Author: Andrea Cosentino <anco...@gmail.com> AuthorDate: Fri Jan 12 13:49:38 2018 +0100 CAMEL-12139 - Camel-AWS SQS: Add the ability to specify credentials and region at component level --- .../camel-aws/src/main/docs/aws-sqs-component.adoc | 14 +- .../camel/component/aws/sqs/SqsComponent.java | 67 +++- .../camel/component/aws/sqs/SqsConfiguration.java | 16 +- .../aws/sqs/SqsComponentConfigurationTest.java | 31 ++ .../sqs/springboot/SqsComponentConfiguration.java | 372 +++++++++++++++++++++ 5 files changed, 497 insertions(+), 3 deletions(-) diff --git a/components/camel-aws/src/main/docs/aws-sqs-component.adoc b/components/camel-aws/src/main/docs/aws-sqs-component.adoc index 03dc78c..185f861 100644 --- a/components/camel-aws/src/main/docs/aws-sqs-component.adoc +++ b/components/camel-aws/src/main/docs/aws-sqs-component.adoc @@ -26,7 +26,19 @@ The queue will be created if they don't already exists. + // component options: START -The AWS Simple Queue Service component has no options. +The AWS Simple Queue Service component supports 5 options which are listed below. + + + +[width="100%",cols="2,5,^1,2",options="header"] +|=== +| Name | Description | Default | Type +| *configuration* (advanced) | The AWS SQS default configuration | | SqsConfiguration +| *accessKey* (common) | Amazon AWS Access Key | | String +| *secretKey* (common) | Amazon AWS Secret Key | | String +| *region* (common) | Specify the queue region which could be used with queueOwnerAWSAccountId to build the service URL. | | String +| *resolveProperty Placeholders* (advanced) | Whether the component should resolve property placeholders on itself when starting. Only properties which are of String type can use property placeholders. | true | boolean +|=== // component options: END diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsComponent.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsComponent.java index 06da6f8..e21610b 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsComponent.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsComponent.java @@ -21,10 +21,20 @@ import java.util.Map; import org.apache.camel.CamelContext; import org.apache.camel.Endpoint; import org.apache.camel.impl.DefaultComponent; +import org.apache.camel.spi.Metadata; import org.apache.camel.util.ObjectHelper; public class SqsComponent extends DefaultComponent { + @Metadata + private String accessKey; + @Metadata + private String secretKey; + @Metadata + private String region; + @Metadata(label = "advanced") + private SqsConfiguration configuration; + public SqsComponent() { this(null); } @@ -32,12 +42,13 @@ public class SqsComponent extends DefaultComponent { public SqsComponent(CamelContext context) { super(context); + this.configuration = new SqsConfiguration(); registerExtension(new SqsComponentVerifierExtension()); } @Override protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { - SqsConfiguration configuration = new SqsConfiguration(); + final SqsConfiguration configuration = this.configuration.copy(); setProperties(configuration, parameters); if (remaining == null || remaining.trim().length() == 0) { @@ -56,6 +67,16 @@ public class SqsComponent extends DefaultComponent { configuration.setQueueName(remaining); } + if (ObjectHelper.isEmpty(configuration.getAccessKey())) { + setAccessKey(accessKey); + } + if (ObjectHelper.isEmpty(configuration.getSecretKey())) { + setSecretKey(secretKey); + } + if (ObjectHelper.isEmpty(configuration.getRegion())) { + setRegion(region); + } + if (configuration.getAmazonSQSClient() == null && (configuration.getAccessKey() == null || configuration.getSecretKey() == null)) { throw new IllegalArgumentException("AmazonSQSClient or accessKey and secretKey must be specified."); } @@ -73,4 +94,48 @@ public class SqsComponent extends DefaultComponent { sqsEndpoint.setConsumerProperties(parameters); return sqsEndpoint; } + + public SqsConfiguration getConfiguration() { + return configuration; + } + + /** + * The AWS SQS default configuration + */ + public void setConfiguration(SqsConfiguration configuration) { + this.configuration = configuration; + } + + public String getAccessKey() { + return configuration.getAccessKey(); + } + + /** + * Amazon AWS Access Key + */ + public void setAccessKey(String accessKey) { + configuration.setAccessKey(accessKey); + } + + public String getSecretKey() { + return configuration.getSecretKey(); + } + + /** + * Amazon AWS Secret Key + */ + public void setSecretKey(String secretKey) { + configuration.setSecretKey(secretKey); + } + + public String getRegion() { + return configuration.getRegion(); + } + + /** + * Specify the queue region which could be used with queueOwnerAWSAccountId to build the service URL. + */ + public void setRegion(String region) { + configuration.setRegion(region); + } } diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java index ec0297d..5817506 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java @@ -17,11 +17,13 @@ package org.apache.camel.component.aws.sqs; import com.amazonaws.services.sqs.AmazonSQS; + +import org.apache.camel.RuntimeCamelException; import org.apache.camel.spi.UriParam; import org.apache.camel.spi.UriParams; @UriParams -public class SqsConfiguration { +public class SqsConfiguration implements Cloneable { // common properties private String queueName; @@ -420,4 +422,16 @@ public class SqsConfiguration { throw new IllegalArgumentException("Unrecognised MessageDeduplicationIdStrategy: " + strategy); } } + + // ************************************************* + // + // ************************************************* + + public SqsConfiguration copy() { + try { + return (SqsConfiguration)super.clone(); + } catch (CloneNotSupportedException e) { + throw new RuntimeCamelException(e); + } + } } diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsComponentConfigurationTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsComponentConfigurationTest.java index 633d6e4..28647d4 100644 --- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsComponentConfigurationTest.java +++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsComponentConfigurationTest.java @@ -21,6 +21,8 @@ import org.apache.camel.impl.PropertyPlaceholderDelegateRegistry; import org.apache.camel.test.junit4.CamelTestSupport; import org.junit.Test; +import com.amazonaws.regions.Regions; + public class SqsComponentConfigurationTest extends CamelTestSupport { @Test @@ -220,4 +222,33 @@ public class SqsComponentConfigurationTest extends CamelTestSupport { SqsComponent component = new SqsComponent(context); component.createEndpoint("aws-sqs://MyQueue?amazonSQSClient=#amazonSQSClient"); } + + @Test + public void createEndpointWithComponentElements() throws Exception { + AmazonSQSClientMock mock = new AmazonSQSClientMock(); + + ((JndiRegistry) ((PropertyPlaceholderDelegateRegistry) context.getRegistry()).getRegistry()).bind("amazonSQSClient", mock); + SqsComponent component = new SqsComponent(context); + component.setAccessKey("XXX"); + component.setSecretKey("YYY"); + SqsEndpoint endpoint = (SqsEndpoint)component.createEndpoint("aws-sqs://MyQueue?amazonSQSClient=#amazonSQSClient"); + + assertEquals("MyQueue", endpoint.getConfiguration().getQueueName()); + assertEquals("XXX", endpoint.getConfiguration().getAccessKey()); + assertEquals("YYY", endpoint.getConfiguration().getSecretKey()); + } + + @Test + public void createEndpointWithComponentAndEndpointElements() throws Exception { + SqsComponent component = new SqsComponent(context); + component.setAccessKey("XXX"); + component.setSecretKey("YYY"); + component.setRegion(Regions.US_WEST_1.toString()); + SqsEndpoint endpoint = (SqsEndpoint)component.createEndpoint("aws-sqs://MyQueue?accessKey=xxxxxx&secretKey=yyyyy®ion=US_EAST_1"); + + assertEquals("MyQueue", endpoint.getConfiguration().getQueueName()); + assertEquals("xxxxxx", endpoint.getConfiguration().getAccessKey()); + assertEquals("yyyyy", endpoint.getConfiguration().getSecretKey()); + assertEquals("US_EAST_1", endpoint.getConfiguration().getRegion()); + } } diff --git a/platforms/spring-boot/components-starter/camel-aws-starter/src/main/java/org/apache/camel/component/aws/sqs/springboot/SqsComponentConfiguration.java b/platforms/spring-boot/components-starter/camel-aws-starter/src/main/java/org/apache/camel/component/aws/sqs/springboot/SqsComponentConfiguration.java index 654ffcf..9d24428 100644 --- a/platforms/spring-boot/components-starter/camel-aws-starter/src/main/java/org/apache/camel/component/aws/sqs/springboot/SqsComponentConfiguration.java +++ b/platforms/spring-boot/components-starter/camel-aws-starter/src/main/java/org/apache/camel/component/aws/sqs/springboot/SqsComponentConfiguration.java @@ -17,6 +17,7 @@ package org.apache.camel.component.aws.sqs.springboot; import javax.annotation.Generated; +import com.amazonaws.services.sqs.AmazonSQS; import org.apache.camel.spring.boot.ComponentConfigurationPropertiesCommon; import org.springframework.boot.context.properties.ConfigurationProperties; @@ -33,12 +34,62 @@ public class SqsComponentConfiguration ComponentConfigurationPropertiesCommon { /** + * The AWS SQS default configuration + */ + private SqsConfigurationNestedConfiguration configuration; + /** + * Amazon AWS Access Key + */ + private String accessKey; + /** + * Amazon AWS Secret Key + */ + private String secretKey; + /** + * Specify the queue region which could be used with queueOwnerAWSAccountId + * to build the service URL. + */ + private String region; + /** * Whether the component should resolve property placeholders on itself when * starting. Only properties which are of String type can use property * placeholders. */ private Boolean resolvePropertyPlaceholders = true; + public SqsConfigurationNestedConfiguration getConfiguration() { + return configuration; + } + + public void setConfiguration( + SqsConfigurationNestedConfiguration configuration) { + this.configuration = configuration; + } + + public String getAccessKey() { + return accessKey; + } + + public void setAccessKey(String accessKey) { + this.accessKey = accessKey; + } + + public String getSecretKey() { + return secretKey; + } + + public void setSecretKey(String secretKey) { + this.secretKey = secretKey; + } + + public String getRegion() { + return region; + } + + public void setRegion(String region) { + this.region = region; + } + public Boolean getResolvePropertyPlaceholders() { return resolvePropertyPlaceholders; } @@ -47,4 +98,325 @@ public class SqsComponentConfiguration Boolean resolvePropertyPlaceholders) { this.resolvePropertyPlaceholders = resolvePropertyPlaceholders; } + + public static class SqsConfigurationNestedConfiguration { + public static final Class CAMEL_NESTED_CLASS = org.apache.camel.component.aws.sqs.SqsConfiguration.class; + /** + * The region with which the AWS-SQS client wants to work with. Only + * works if Camel creates the AWS-SQS client, i.e., if you explicitly + * set amazonSQSClient, then this setting will have no effect. You would + * have to set it on the client you create directly + */ + private String amazonSQSEndpoint; + /** + * The hostname of the Amazon AWS cloud. + */ + private String amazonAWSHost = "amazonaws.com"; + /** + * Name of queue. The queue will be created if they don't already + * exists. + */ + private String queueName; + /** + * Amazon AWS Access Key + */ + private String accessKey; + /** + * Amazon AWS Secret Key + */ + private String secretKey; + /** + * Delete message from SQS after it has been read + */ + private Boolean deleteAfterRead = true; + /** + * To use the AmazonSQS as client + */ + private AmazonSQS amazonSQSClient; + /** + * The duration (in seconds) that the received messages are hidden from + * subsequent retrieve requests after being retrieved by a + * ReceiveMessage request to set in the + * com.amazonaws.services.sqs.model.SetQueueAttributesRequest. This only + * make sense if its different from defaultVisibilityTimeout. It changes + * the queue visibility timeout attribute permanently. + */ + private Integer visibilityTimeout; + /** + * A list of attribute names to receive when consuming. Multiple names + * can be separated by comma. + */ + private String attributeNames; + /** + * A list of message attribute names to receive when consuming. Multiple + * names can be separated by comma. + */ + private String messageAttributeNames; + /** + * The default visibility timeout (in seconds) + */ + private Integer defaultVisibilityTimeout; + /** + * Delay sending messages for a number of seconds. + */ + private Integer delaySeconds; + /** + * The maximumMessageSize (in bytes) an SQS message can contain for this + * queue. + */ + private Integer maximumMessageSize; + /** + * The messageRetentionPeriod (in seconds) a message will be retained by + * SQS for this queue. + */ + private Integer messageRetentionPeriod; + /** + * The policy for this queue + */ + private String policy; + /** + * Specify the policy that send message to DeadLetter queue. See detail + * at Amazon docs. + */ + private String redrivePolicy; + /** + * If enabled then a scheduled background task will keep extending the + * message visibility on SQS. This is needed if it takes a long time to + * process the message. If set to true defaultVisibilityTimeout must be + * set. See details at Amazon docs. + */ + private Boolean extendMessageVisibility = false; + /** + * If you do not specify WaitTimeSeconds in the request, the queue + * attribute ReceiveMessageWaitTimeSeconds is used to determine how long + * to wait. + */ + private Integer receiveMessageWaitTimeSeconds; + /** + * Duration in seconds (0 to 20) that the ReceiveMessage action call + * will wait until a message is in the queue to include in the response. + */ + private Integer waitTimeSeconds; + /** + * Specify the queue owner aws account id when you need to connect the + * queue with different account owner. + */ + private String queueOwnerAWSAccountId; + /** + * Whether or not to send the DeleteMessage to the SQS queue if an + * exchange fails to get through a filter. If 'false' and exchange does + * not make it through a Camel filter upstream in the route, then don't + * send DeleteMessage. + */ + private Boolean deleteIfFiltered = true; + /** + * Specify the queue region which could be used with + * queueOwnerAWSAccountId to build the service URL. + */ + private String region; + private Integer concurrentConsumers = 1; + private String proxyHost; + private Integer proxyPort; + + public String getAmazonSQSEndpoint() { + return amazonSQSEndpoint; + } + + public void setAmazonSQSEndpoint(String amazonSQSEndpoint) { + this.amazonSQSEndpoint = amazonSQSEndpoint; + } + + public String getAmazonAWSHost() { + return amazonAWSHost; + } + + public void setAmazonAWSHost(String amazonAWSHost) { + this.amazonAWSHost = amazonAWSHost; + } + + public String getQueueName() { + return queueName; + } + + public void setQueueName(String queueName) { + this.queueName = queueName; + } + + public String getAccessKey() { + return accessKey; + } + + public void setAccessKey(String accessKey) { + this.accessKey = accessKey; + } + + public String getSecretKey() { + return secretKey; + } + + public void setSecretKey(String secretKey) { + this.secretKey = secretKey; + } + + public Boolean getDeleteAfterRead() { + return deleteAfterRead; + } + + public void setDeleteAfterRead(Boolean deleteAfterRead) { + this.deleteAfterRead = deleteAfterRead; + } + + public AmazonSQS getAmazonSQSClient() { + return amazonSQSClient; + } + + public void setAmazonSQSClient(AmazonSQS amazonSQSClient) { + this.amazonSQSClient = amazonSQSClient; + } + + public Integer getVisibilityTimeout() { + return visibilityTimeout; + } + + public void setVisibilityTimeout(Integer visibilityTimeout) { + this.visibilityTimeout = visibilityTimeout; + } + + public String getAttributeNames() { + return attributeNames; + } + + public void setAttributeNames(String attributeNames) { + this.attributeNames = attributeNames; + } + + public String getMessageAttributeNames() { + return messageAttributeNames; + } + + public void setMessageAttributeNames(String messageAttributeNames) { + this.messageAttributeNames = messageAttributeNames; + } + + public Integer getDefaultVisibilityTimeout() { + return defaultVisibilityTimeout; + } + + public void setDefaultVisibilityTimeout(Integer defaultVisibilityTimeout) { + this.defaultVisibilityTimeout = defaultVisibilityTimeout; + } + + public Integer getDelaySeconds() { + return delaySeconds; + } + + public void setDelaySeconds(Integer delaySeconds) { + this.delaySeconds = delaySeconds; + } + + public Integer getMaximumMessageSize() { + return maximumMessageSize; + } + + public void setMaximumMessageSize(Integer maximumMessageSize) { + this.maximumMessageSize = maximumMessageSize; + } + + public Integer getMessageRetentionPeriod() { + return messageRetentionPeriod; + } + + public void setMessageRetentionPeriod(Integer messageRetentionPeriod) { + this.messageRetentionPeriod = messageRetentionPeriod; + } + + public String getPolicy() { + return policy; + } + + public void setPolicy(String policy) { + this.policy = policy; + } + + public String getRedrivePolicy() { + return redrivePolicy; + } + + public void setRedrivePolicy(String redrivePolicy) { + this.redrivePolicy = redrivePolicy; + } + + public Boolean getExtendMessageVisibility() { + return extendMessageVisibility; + } + + public void setExtendMessageVisibility(Boolean extendMessageVisibility) { + this.extendMessageVisibility = extendMessageVisibility; + } + + public Integer getReceiveMessageWaitTimeSeconds() { + return receiveMessageWaitTimeSeconds; + } + + public void setReceiveMessageWaitTimeSeconds( + Integer receiveMessageWaitTimeSeconds) { + this.receiveMessageWaitTimeSeconds = receiveMessageWaitTimeSeconds; + } + + public Integer getWaitTimeSeconds() { + return waitTimeSeconds; + } + + public void setWaitTimeSeconds(Integer waitTimeSeconds) { + this.waitTimeSeconds = waitTimeSeconds; + } + + public String getQueueOwnerAWSAccountId() { + return queueOwnerAWSAccountId; + } + + public void setQueueOwnerAWSAccountId(String queueOwnerAWSAccountId) { + this.queueOwnerAWSAccountId = queueOwnerAWSAccountId; + } + + public Boolean getDeleteIfFiltered() { + return deleteIfFiltered; + } + + public void setDeleteIfFiltered(Boolean deleteIfFiltered) { + this.deleteIfFiltered = deleteIfFiltered; + } + + public String getRegion() { + return region; + } + + public void setRegion(String region) { + this.region = region; + } + + public Integer getConcurrentConsumers() { + return concurrentConsumers; + } + + public void setConcurrentConsumers(Integer concurrentConsumers) { + this.concurrentConsumers = concurrentConsumers; + } + + public String getProxyHost() { + return proxyHost; + } + + public void setProxyHost(String proxyHost) { + this.proxyHost = proxyHost; + } + + public Integer getProxyPort() { + return proxyPort; + } + + public void setProxyPort(Integer proxyPort) { + this.proxyPort = proxyPort; + } + } } \ No newline at end of file -- To stop receiving notification emails like this one, please contact "commits@camel.apache.org" <commits@camel.apache.org>.