This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 381b633a4f4d2a4be7b41ec399c87bb189c32b12 Author: Andrea Cosentino <anco...@gmail.com> AuthorDate: Fri Dec 21 11:16:17 2018 +0100 CAMEL-13013 - Camel-AWS: Being able to create a subscription between an AWS SNS Topic and an AWS SQS Queue --- .../camel/component/aws/sns/SnsConfiguration.java | 40 ++++++++++++++++++ .../camel/component/aws/sns/SnsEndpoint.java | 11 +++++ .../aws/sns/SnsComponentConfigurationTest.java | 48 +++++++++++++++++++++- .../camel-aws/src/test/resources/log4j2.properties | 2 +- 4 files changed, 99 insertions(+), 2 deletions(-) diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsConfiguration.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsConfiguration.java index 80f5ef2..61b17f6 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsConfiguration.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsConfiguration.java @@ -17,6 +17,7 @@ package org.apache.camel.component.aws.sns; import com.amazonaws.services.sns.AmazonSNS; +import com.amazonaws.services.sqs.AmazonSQS; import org.apache.camel.RuntimeCamelException; import org.apache.camel.spi.UriParam; @@ -39,6 +40,12 @@ public class SnsConfiguration implements Cloneable { private String proxyHost; @UriParam private Integer proxyPort; + @UriParam + private AmazonSQS amazonSQSClient; + @UriParam + private String queueUrl; + @UriParam + private boolean subscribeSNStoSQS; // Producer only properties @UriParam @@ -170,6 +177,39 @@ public class SnsConfiguration implements Cloneable { public void setRegion(String region) { this.region = region; } + + public AmazonSQS getAmazonSQSClient() { + return amazonSQSClient; + } + + /** + * An SQS Client to use as bridge between SNS and SQS + */ + public void setAmazonSQSClient(AmazonSQS amazonSQSClient) { + this.amazonSQSClient = amazonSQSClient; + } + + public String getQueueUrl() { + return queueUrl; + } + + /** + * The queueUrl to subscribe to + */ + public void setQueueUrl(String queueUrl) { + this.queueUrl = queueUrl; + } + + public boolean isSubscribeSNStoSQS() { + return subscribeSNStoSQS; + } + + /** + * Define if the subscription between SNS Topic and SQS must be done or not + */ + public void setSubscribeSNStoSQS(boolean subscribeSNStoSQS) { + this.subscribeSNStoSQS = subscribeSNStoSQS; + } // ************************************************* // diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsEndpoint.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsEndpoint.java index ae2da3f..04cd02d 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsEndpoint.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsEndpoint.java @@ -30,6 +30,7 @@ import com.amazonaws.services.sns.model.CreateTopicResult; import com.amazonaws.services.sns.model.ListTopicsResult; import com.amazonaws.services.sns.model.SetTopicAttributesRequest; import com.amazonaws.services.sns.model.Topic; +import com.amazonaws.services.sns.util.Topics; import org.apache.camel.Component; import org.apache.camel.Consumer; @@ -100,6 +101,7 @@ public class SnsEndpoint extends DefaultEndpoint implements HeaderFilterStrategy headerFilterStrategy = new SnsHeaderFilterStrategy(); } + System.err.println(configuration.getTopicArn()); if (configuration.getTopicArn() == null) { try { String nextToken = null; @@ -141,6 +143,15 @@ public class SnsEndpoint extends DefaultEndpoint implements HeaderFilterStrategy log.trace("Topic policy updated"); } + if (configuration.isSubscribeSNStoSQS()) { + if (ObjectHelper.isNotEmpty(configuration.getAmazonSQSClient()) && ObjectHelper.isNotEmpty(configuration.getQueueUrl())) { + String subscriptionARN = Topics.subscribeQueue(snsClient, configuration.getAmazonSQSClient(), configuration.getTopicArn(), configuration.getQueueUrl()); + log.trace("Subscription of SQS Queue to SNS Topic done with Amazon resource name: {}", subscriptionARN); + } else { + throw new IllegalArgumentException("Using the SubscribeSNStoSQS option require both AmazonSQSClient and Queue URL options"); + } + } + } @Override diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/sns/SnsComponentConfigurationTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/sns/SnsComponentConfigurationTest.java index 41a9abd..38da85e 100644 --- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/sns/SnsComponentConfigurationTest.java +++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/sns/SnsComponentConfigurationTest.java @@ -18,6 +18,7 @@ package org.apache.camel.component.aws.sns; import com.amazonaws.regions.Regions; +import org.apache.camel.component.aws.sqs.AmazonSQSClientMock; import org.apache.camel.impl.JndiRegistry; import org.apache.camel.impl.PropertyPlaceholderDelegateRegistry; import org.apache.camel.test.junit4.CamelTestSupport; @@ -41,6 +42,7 @@ public class SnsComponentConfigurationTest extends CamelTestSupport { assertNull(endpoint.getConfiguration().getSubject()); assertNull(endpoint.getConfiguration().getPolicy()); } + @Test public void createEndpointWithOnlyAccessKeyAndSecretKey() throws Exception { SnsComponent component = new SnsComponent(context); @@ -112,6 +114,50 @@ public class SnsComponentConfigurationTest extends CamelTestSupport { endpoint.getConfiguration().getPolicy()); } + @Test + public void createEndpointWithSQSSubscription() throws Exception { + AmazonSNSClientMock mock = new AmazonSNSClientMock(); + AmazonSQSClientMock mockSQS = new AmazonSQSClientMock(); + + ((JndiRegistry) ((PropertyPlaceholderDelegateRegistry) context.getRegistry()).getRegistry()).bind("amazonSNSClient", mock); + ((JndiRegistry) ((PropertyPlaceholderDelegateRegistry) context.getRegistry()).getRegistry()).bind("amazonSQSClient", mockSQS); + SnsComponent component = new SnsComponent(context); + SnsEndpoint endpoint = (SnsEndpoint) component.createEndpoint("aws-sns://MyTopic?amazonSNSClient=#amazonSNSClient&accessKey=xxx&secretKey=yyy&amazonSQSClient=#amazonSQSClient&queueUrl=arn:aws:sqs:us-east-1:541925086079:MyQueue&subscribeSNStoSQS=true"); + + assertEquals("MyTopic", endpoint.getConfiguration().getTopicName()); + assertEquals("xxx", endpoint.getConfiguration().getAccessKey()); + assertEquals("yyy", endpoint.getConfiguration().getSecretKey()); + assertEquals("arn:aws:sqs:us-east-1:541925086079:MyQueue", endpoint.getConfiguration().getQueueUrl()); + assertNotNull(endpoint.getConfiguration().getAmazonSNSClient()); + assertNotNull(endpoint.getConfiguration().getAmazonSQSClient()); + assertNull(endpoint.getConfiguration().getTopicArn()); + assertNull(endpoint.getConfiguration().getSubject()); + assertNull(endpoint.getConfiguration().getPolicy()); + } + + @Test + public void createEndpointWithSQSSubscriptionIllegalArgument() throws Exception { + AmazonSNSClientMock mock = new AmazonSNSClientMock(); + AmazonSQSClientMock mockSQS = new AmazonSQSClientMock(); + + ((JndiRegistry) ((PropertyPlaceholderDelegateRegistry) context.getRegistry()).getRegistry()).bind("amazonSNSClient", mock); + ((JndiRegistry) ((PropertyPlaceholderDelegateRegistry) context.getRegistry()).getRegistry()).bind("amazonSQSClient", mockSQS); + SnsComponent component = new SnsComponent(context); + SnsEndpoint endpoint = (SnsEndpoint) component.createEndpoint("aws-sns://MyTopic?amazonSNSClient=#amazonSNSClient&accessKey=xxx&secretKey=yyy&amazonSQSClient=#amazonSQSClient&subscribeSNStoSQS=true"); + + assertEquals("MyTopic", endpoint.getConfiguration().getTopicName()); + assertEquals("xxx", endpoint.getConfiguration().getAccessKey()); + assertEquals("yyy", endpoint.getConfiguration().getSecretKey()); + assertNull(endpoint.getConfiguration().getQueueUrl()); + assertNotNull(endpoint.getConfiguration().getAmazonSNSClient()); + assertNotNull(endpoint.getConfiguration().getAmazonSQSClient()); + assertNull(endpoint.getConfiguration().getTopicArn()); + assertNull(endpoint.getConfiguration().getSubject()); + assertNull(endpoint.getConfiguration().getPolicy()); + + endpoint.start(); + } + @Test(expected = IllegalArgumentException.class) public void createEndpointWithoutAccessKeyConfiguration() throws Exception { SnsComponent component = new SnsComponent(context); @@ -155,7 +201,7 @@ public class SnsComponentConfigurationTest extends CamelTestSupport { AmazonSNSClientMock mock = new AmazonSNSClientMock(); ((JndiRegistry) ((PropertyPlaceholderDelegateRegistry) context.getRegistry()).getRegistry()).bind("amazonSNSClient", mock); - + SnsComponent component = new SnsComponent(context); component.createEndpoint("aws-sns://MyTopic?amazonSNSClient=#amazonSNSClient"); } diff --git a/components/camel-aws/src/test/resources/log4j2.properties b/components/camel-aws/src/test/resources/log4j2.properties index a8ae3ed..566422d 100644 --- a/components/camel-aws/src/test/resources/log4j2.properties +++ b/components/camel-aws/src/test/resources/log4j2.properties @@ -24,5 +24,5 @@ appender.out.type = Console appender.out.name = out appender.out.layout.type = PatternLayout appender.out.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m%n -rootLogger.level = DEBUG +rootLogger.level = INFO rootLogger.appenderRef.file.ref = file