This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 381b633a4f4d2a4be7b41ec399c87bb189c32b12
Author: Andrea Cosentino <anco...@gmail.com>
AuthorDate: Fri Dec 21 11:16:17 2018 +0100

    CAMEL-13013 - Camel-AWS: Being able to create a subscription between an AWS 
SNS Topic and an AWS SQS Queue
---
 .../camel/component/aws/sns/SnsConfiguration.java  | 40 ++++++++++++++++++
 .../camel/component/aws/sns/SnsEndpoint.java       | 11 +++++
 .../aws/sns/SnsComponentConfigurationTest.java     | 48 +++++++++++++++++++++-
 .../camel-aws/src/test/resources/log4j2.properties |  2 +-
 4 files changed, 99 insertions(+), 2 deletions(-)

diff --git 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsConfiguration.java
 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsConfiguration.java
index 80f5ef2..61b17f6 100644
--- 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsConfiguration.java
+++ 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsConfiguration.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.aws.sns;
 
 import com.amazonaws.services.sns.AmazonSNS;
+import com.amazonaws.services.sqs.AmazonSQS;
 
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.spi.UriParam;
@@ -39,6 +40,12 @@ public class SnsConfiguration implements Cloneable {
     private String proxyHost;
     @UriParam
     private Integer proxyPort;
+    @UriParam
+    private AmazonSQS amazonSQSClient;
+    @UriParam
+    private String queueUrl;
+    @UriParam
+    private boolean subscribeSNStoSQS;
 
     // Producer only properties
     @UriParam
@@ -170,6 +177,39 @@ public class SnsConfiguration implements Cloneable {
     public void setRegion(String region) {
         this.region = region;
     }
+
+    public AmazonSQS getAmazonSQSClient() {
+        return amazonSQSClient;
+    }
+
+    /**
+     * An SQS Client to use as bridge between SNS and SQS
+     */
+    public void setAmazonSQSClient(AmazonSQS amazonSQSClient) {
+        this.amazonSQSClient = amazonSQSClient;
+    }
+
+    public String getQueueUrl() {
+        return queueUrl;
+    }
+
+    /**
+     * The queueUrl to subscribe to
+     */
+    public void setQueueUrl(String queueUrl) {
+        this.queueUrl = queueUrl;
+    }
+
+    public boolean isSubscribeSNStoSQS() {
+        return subscribeSNStoSQS;
+    }
+
+    /**
+     * Define if the subscription between SNS Topic and SQS must be done or not
+     */
+    public void setSubscribeSNStoSQS(boolean subscribeSNStoSQS) {
+        this.subscribeSNStoSQS = subscribeSNStoSQS;
+    }
     
     // *************************************************
     //
diff --git 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsEndpoint.java
 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsEndpoint.java
index ae2da3f..04cd02d 100644
--- 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsEndpoint.java
+++ 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsEndpoint.java
@@ -30,6 +30,7 @@ import com.amazonaws.services.sns.model.CreateTopicResult;
 import com.amazonaws.services.sns.model.ListTopicsResult;
 import com.amazonaws.services.sns.model.SetTopicAttributesRequest;
 import com.amazonaws.services.sns.model.Topic;
+import com.amazonaws.services.sns.util.Topics;
 
 import org.apache.camel.Component;
 import org.apache.camel.Consumer;
@@ -100,6 +101,7 @@ public class SnsEndpoint extends DefaultEndpoint implements 
HeaderFilterStrategy
             headerFilterStrategy = new SnsHeaderFilterStrategy();
         }
         
+        System.err.println(configuration.getTopicArn());
         if (configuration.getTopicArn() == null) {
             try {
                 String nextToken = null;
@@ -141,6 +143,15 @@ public class SnsEndpoint extends DefaultEndpoint 
implements HeaderFilterStrategy
             log.trace("Topic policy updated");
         }
         
+        if (configuration.isSubscribeSNStoSQS()) {
+            if (ObjectHelper.isNotEmpty(configuration.getAmazonSQSClient()) && 
ObjectHelper.isNotEmpty(configuration.getQueueUrl())) {
+                String subscriptionARN = Topics.subscribeQueue(snsClient, 
configuration.getAmazonSQSClient(), configuration.getTopicArn(), 
configuration.getQueueUrl());
+                log.trace("Subscription of SQS Queue to SNS Topic done with 
Amazon resource name: {}", subscriptionARN);
+            } else {
+                throw new IllegalArgumentException("Using the 
SubscribeSNStoSQS option require both AmazonSQSClient and Queue URL options");
+            }
+        }
+        
     }
     
     @Override
diff --git 
a/components/camel-aws/src/test/java/org/apache/camel/component/aws/sns/SnsComponentConfigurationTest.java
 
b/components/camel-aws/src/test/java/org/apache/camel/component/aws/sns/SnsComponentConfigurationTest.java
index 41a9abd..38da85e 100644
--- 
a/components/camel-aws/src/test/java/org/apache/camel/component/aws/sns/SnsComponentConfigurationTest.java
+++ 
b/components/camel-aws/src/test/java/org/apache/camel/component/aws/sns/SnsComponentConfigurationTest.java
@@ -18,6 +18,7 @@ package org.apache.camel.component.aws.sns;
 
 import com.amazonaws.regions.Regions;
 
+import org.apache.camel.component.aws.sqs.AmazonSQSClientMock;
 import org.apache.camel.impl.JndiRegistry;
 import org.apache.camel.impl.PropertyPlaceholderDelegateRegistry;
 import org.apache.camel.test.junit4.CamelTestSupport;
@@ -41,6 +42,7 @@ public class SnsComponentConfigurationTest extends 
CamelTestSupport {
         assertNull(endpoint.getConfiguration().getSubject());
         assertNull(endpoint.getConfiguration().getPolicy());
     }
+    
     @Test
     public void createEndpointWithOnlyAccessKeyAndSecretKey() throws Exception 
{
         SnsComponent component = new SnsComponent(context);
@@ -112,6 +114,50 @@ public class SnsComponentConfigurationTest extends 
CamelTestSupport {
                 endpoint.getConfiguration().getPolicy());
     }
     
+    @Test
+    public void createEndpointWithSQSSubscription() throws Exception {
+        AmazonSNSClientMock mock = new AmazonSNSClientMock();
+        AmazonSQSClientMock mockSQS = new AmazonSQSClientMock();
+        
+        ((JndiRegistry) ((PropertyPlaceholderDelegateRegistry) 
context.getRegistry()).getRegistry()).bind("amazonSNSClient", mock);
+        ((JndiRegistry) ((PropertyPlaceholderDelegateRegistry) 
context.getRegistry()).getRegistry()).bind("amazonSQSClient", mockSQS);
+        SnsComponent component = new SnsComponent(context);
+        SnsEndpoint endpoint = (SnsEndpoint) 
component.createEndpoint("aws-sns://MyTopic?amazonSNSClient=#amazonSNSClient&accessKey=xxx&secretKey=yyy&amazonSQSClient=#amazonSQSClient&queueUrl=arn:aws:sqs:us-east-1:541925086079:MyQueue&subscribeSNStoSQS=true");
+        
+        assertEquals("MyTopic", endpoint.getConfiguration().getTopicName());
+        assertEquals("xxx", endpoint.getConfiguration().getAccessKey());
+        assertEquals("yyy", endpoint.getConfiguration().getSecretKey());
+        assertEquals("arn:aws:sqs:us-east-1:541925086079:MyQueue", 
endpoint.getConfiguration().getQueueUrl());
+        assertNotNull(endpoint.getConfiguration().getAmazonSNSClient());
+        assertNotNull(endpoint.getConfiguration().getAmazonSQSClient());
+        assertNull(endpoint.getConfiguration().getTopicArn());
+        assertNull(endpoint.getConfiguration().getSubject());
+        assertNull(endpoint.getConfiguration().getPolicy());
+    }
+    
+    @Test
+    public void createEndpointWithSQSSubscriptionIllegalArgument() throws 
Exception {
+        AmazonSNSClientMock mock = new AmazonSNSClientMock();
+        AmazonSQSClientMock mockSQS = new AmazonSQSClientMock();
+        
+        ((JndiRegistry) ((PropertyPlaceholderDelegateRegistry) 
context.getRegistry()).getRegistry()).bind("amazonSNSClient", mock);
+        ((JndiRegistry) ((PropertyPlaceholderDelegateRegistry) 
context.getRegistry()).getRegistry()).bind("amazonSQSClient", mockSQS);
+        SnsComponent component = new SnsComponent(context);
+        SnsEndpoint endpoint = (SnsEndpoint) 
component.createEndpoint("aws-sns://MyTopic?amazonSNSClient=#amazonSNSClient&accessKey=xxx&secretKey=yyy&amazonSQSClient=#amazonSQSClient&subscribeSNStoSQS=true");
+        
+        assertEquals("MyTopic", endpoint.getConfiguration().getTopicName());
+        assertEquals("xxx", endpoint.getConfiguration().getAccessKey());
+        assertEquals("yyy", endpoint.getConfiguration().getSecretKey());
+        assertNull(endpoint.getConfiguration().getQueueUrl());
+        assertNotNull(endpoint.getConfiguration().getAmazonSNSClient());
+        assertNotNull(endpoint.getConfiguration().getAmazonSQSClient());
+        assertNull(endpoint.getConfiguration().getTopicArn());
+        assertNull(endpoint.getConfiguration().getSubject());
+        assertNull(endpoint.getConfiguration().getPolicy());
+        
+        endpoint.start();
+    }
+    
     @Test(expected = IllegalArgumentException.class)
     public void createEndpointWithoutAccessKeyConfiguration() throws Exception 
{
         SnsComponent component = new SnsComponent(context);
@@ -155,7 +201,7 @@ public class SnsComponentConfigurationTest extends 
CamelTestSupport {
         AmazonSNSClientMock mock = new AmazonSNSClientMock();
         
         ((JndiRegistry) ((PropertyPlaceholderDelegateRegistry) 
context.getRegistry()).getRegistry()).bind("amazonSNSClient", mock);
-          
+
         SnsComponent component = new SnsComponent(context);
         
component.createEndpoint("aws-sns://MyTopic?amazonSNSClient=#amazonSNSClient");
     }
diff --git a/components/camel-aws/src/test/resources/log4j2.properties 
b/components/camel-aws/src/test/resources/log4j2.properties
index a8ae3ed..566422d 100644
--- a/components/camel-aws/src/test/resources/log4j2.properties
+++ b/components/camel-aws/src/test/resources/log4j2.properties
@@ -24,5 +24,5 @@ appender.out.type = Console
 appender.out.name = out
 appender.out.layout.type = PatternLayout
 appender.out.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m%n
-rootLogger.level = DEBUG
+rootLogger.level = INFO
 rootLogger.appenderRef.file.ref = file

Reply via email to