This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch async-aws-sns
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 9605406b986b57edfd7415bd569654fab34a11d3
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Tue Jul 2 08:38:09 2019 +0200

    
https://stackoverflow.com/questions/56775883/camel-sns-how-to-use-async-client-without-blocking?noredirect=1#comment100229953_56775883.
 Work in progress
---
 .../camel/component/aws/sns/SnsComponent.java      |  3 +-
 .../camel/component/aws/sns/SnsConfiguration.java  |  9 ++--
 .../camel/component/aws/sns/SnsEndpoint.java       | 23 +++++----
 .../camel/component/aws/sns/SnsProducer.java       | 56 ++++++++++++----------
 4 files changed, 53 insertions(+), 38 deletions(-)

diff --git 
a/components/camel-aws-sns/src/main/java/org/apache/camel/component/aws/sns/SnsComponent.java
 
b/components/camel-aws-sns/src/main/java/org/apache/camel/component/aws/sns/SnsComponent.java
index aaa1bcd..2a488b3 100644
--- 
a/components/camel-aws-sns/src/main/java/org/apache/camel/component/aws/sns/SnsComponent.java
+++ 
b/components/camel-aws-sns/src/main/java/org/apache/camel/component/aws/sns/SnsComponent.java
@@ -22,6 +22,7 @@ import java.util.Set;
 import com.amazonaws.regions.Regions;
 import com.amazonaws.services.sns.AmazonSNS;
 
+import com.amazonaws.services.sns.AmazonSNSAsync;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
 import org.apache.camel.spi.Metadata;
@@ -133,7 +134,7 @@ public class SnsComponent extends DefaultComponent {
     }
     
     private void checkAndSetRegistryClient(SnsConfiguration configuration) {
-        Set<AmazonSNS> clients = 
getCamelContext().getRegistry().findByType(AmazonSNS.class);
+        Set<AmazonSNSAsync> clients = 
getCamelContext().getRegistry().findByType(AmazonSNSAsync.class);
         if (clients.size() == 1) {
             
configuration.setAmazonSNSClient(clients.stream().findFirst().get());
         }
diff --git 
a/components/camel-aws-sns/src/main/java/org/apache/camel/component/aws/sns/SnsConfiguration.java
 
b/components/camel-aws-sns/src/main/java/org/apache/camel/component/aws/sns/SnsConfiguration.java
index 7690de8..a6bdc53 100644
--- 
a/components/camel-aws-sns/src/main/java/org/apache/camel/component/aws/sns/SnsConfiguration.java
+++ 
b/components/camel-aws-sns/src/main/java/org/apache/camel/component/aws/sns/SnsConfiguration.java
@@ -17,6 +17,9 @@
 package org.apache.camel.component.aws.sns;
 
 import com.amazonaws.services.sns.AmazonSNS;
+import com.amazonaws.services.sns.AmazonSNSAsync;
+import com.amazonaws.services.sns.AmazonSNSAsyncClient;
+import com.amazonaws.services.sns.AmazonSNSClient;
 import com.amazonaws.services.sqs.AmazonSQS;
 
 import org.apache.camel.RuntimeCamelException;
@@ -31,7 +34,7 @@ public class SnsConfiguration implements Cloneable {
     // Common properties
     private String topicName;
     @UriParam
-    private AmazonSNS amazonSNSClient;
+    private AmazonSNSAsync amazonSNSClient;
     @UriParam(label = "security", secret = true)
     private String accessKey;
     @UriParam(label = "security", secret = true)
@@ -108,14 +111,14 @@ public class SnsConfiguration implements Cloneable {
         this.secretKey = secretKey;
     }
 
-    public AmazonSNS getAmazonSNSClient() {
+    public AmazonSNSAsync getAmazonSNSClient() {
         return amazonSNSClient;
     }
 
     /**
      * To use the AmazonSNS as the client
      */
-    public void setAmazonSNSClient(AmazonSNS amazonSNSClient) {
+    public void setAmazonSNSClient(AmazonSNSAsync amazonSNSClient) {
         this.amazonSNSClient = amazonSNSClient;
     }
 
diff --git 
a/components/camel-aws-sns/src/main/java/org/apache/camel/component/aws/sns/SnsEndpoint.java
 
b/components/camel-aws-sns/src/main/java/org/apache/camel/component/aws/sns/SnsEndpoint.java
index 64636b4..b6e750b 100644
--- 
a/components/camel-aws-sns/src/main/java/org/apache/camel/component/aws/sns/SnsEndpoint.java
+++ 
b/components/camel-aws-sns/src/main/java/org/apache/camel/component/aws/sns/SnsEndpoint.java
@@ -27,6 +27,9 @@ import com.amazonaws.auth.AWSStaticCredentialsProvider;
 import com.amazonaws.auth.BasicAWSCredentials;
 import com.amazonaws.regions.Regions;
 import com.amazonaws.services.sns.AmazonSNS;
+import com.amazonaws.services.sns.AmazonSNSAsync;
+import com.amazonaws.services.sns.AmazonSNSAsyncClient;
+import com.amazonaws.services.sns.AmazonSNSAsyncClientBuilder;
 import com.amazonaws.services.sns.AmazonSNSClientBuilder;
 import com.amazonaws.services.sns.model.CreateTopicRequest;
 import com.amazonaws.services.sns.model.CreateTopicResult;
@@ -55,7 +58,7 @@ import org.apache.camel.util.ObjectHelper;
     producerOnly = true, label = "cloud,mobile,messaging")
 public class SnsEndpoint extends DefaultEndpoint implements 
HeaderFilterStrategyAware {
 
-    private AmazonSNS snsClient;
+    private AmazonSNSAsync snsClient;
 
     @UriPath(description = "Topic name or ARN")
     @Metadata(required = true)
@@ -178,11 +181,11 @@ public class SnsEndpoint extends DefaultEndpoint 
implements HeaderFilterStrategy
         this.configuration = configuration;
     }
     
-    public void setSNSClient(AmazonSNS snsClient) {
+    public void setSNSClient(AmazonSNSAsync snsClient) {
         this.snsClient = snsClient;
     }
     
-    public AmazonSNS getSNSClient() {
+    public AmazonSNSAsync getSNSClient() {
         return snsClient;
     }
 
@@ -191,9 +194,9 @@ public class SnsEndpoint extends DefaultEndpoint implements 
HeaderFilterStrategy
      *
      * @return AmazonSNSClient
      */
-    AmazonSNS createSNSClient() {
-        AmazonSNS client = null;
-        AmazonSNSClientBuilder clientBuilder = null;
+    AmazonSNSAsync createSNSClient() {
+        AmazonSNSAsync client;
+        AmazonSNSAsyncClientBuilder clientBuilder;
         ClientConfiguration clientConfiguration = null;
         boolean isClientConfigFound = false;
         if (ObjectHelper.isNotEmpty(configuration.getProxyHost()) && 
ObjectHelper.isNotEmpty(configuration.getProxyPort())) {
@@ -206,15 +209,15 @@ public class SnsEndpoint extends DefaultEndpoint 
implements HeaderFilterStrategy
             AWSCredentials credentials = new 
BasicAWSCredentials(configuration.getAccessKey(), configuration.getSecretKey());
             AWSCredentialsProvider credentialsProvider = new 
AWSStaticCredentialsProvider(credentials);
             if (isClientConfigFound) {
-                clientBuilder = 
AmazonSNSClientBuilder.standard().withClientConfiguration(clientConfiguration).withCredentials(credentialsProvider);
+                clientBuilder = 
AmazonSNSAsyncClientBuilder.standard().withClientConfiguration(clientConfiguration).withCredentials(credentialsProvider);
             } else {
-                clientBuilder = 
AmazonSNSClientBuilder.standard().withCredentials(credentialsProvider);
+                clientBuilder = 
AmazonSNSAsyncClientBuilder.standard().withCredentials(credentialsProvider);
             }
         } else {
             if (isClientConfigFound) {
-                clientBuilder = AmazonSNSClientBuilder.standard();
+                clientBuilder = AmazonSNSAsyncClientBuilder.standard();
             } else {
-                clientBuilder = 
AmazonSNSClientBuilder.standard().withClientConfiguration(clientConfiguration);
+                clientBuilder = 
AmazonSNSAsyncClientBuilder.standard().withClientConfiguration(clientConfiguration);
             }
         }
         if (ObjectHelper.isNotEmpty(configuration.getRegion())) {
diff --git 
a/components/camel-aws-sns/src/main/java/org/apache/camel/component/aws/sns/SnsProducer.java
 
b/components/camel-aws-sns/src/main/java/org/apache/camel/component/aws/sns/SnsProducer.java
index 7eaea77..ac75ffc 100644
--- 
a/components/camel-aws-sns/src/main/java/org/apache/camel/component/aws/sns/SnsProducer.java
+++ 
b/components/camel-aws-sns/src/main/java/org/apache/camel/component/aws/sns/SnsProducer.java
@@ -22,22 +22,22 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
 
+import com.amazonaws.handlers.AsyncHandler;
 import com.amazonaws.services.sns.model.MessageAttributeValue;
 import com.amazonaws.services.sns.model.PublishRequest;
 import com.amazonaws.services.sns.model.PublishResult;
+import org.apache.camel.AsyncCallback;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
-import org.apache.camel.Message;
 import org.apache.camel.spi.HeaderFilterStrategy;
-import org.apache.camel.support.DefaultProducer;
+import org.apache.camel.support.DefaultAsyncProducer;
 import org.apache.camel.util.URISupport;
 
-
 /**
  * A Producer which sends messages to the Amazon Web Service Simple 
Notification Service
  * <a href="http://aws.amazon.com/sns/";>AWS SNS</a>
  */
-public class SnsProducer extends DefaultProducer {
+public class SnsProducer extends DefaultAsyncProducer {
 
     private transient String snsProducerToString;
 
@@ -45,23 +45,39 @@ public class SnsProducer extends DefaultProducer {
         super(endpoint);
     }
 
-    public void process(Exchange exchange) throws Exception {
+    @Override
+    public boolean process(final Exchange exchange, final AsyncCallback 
callback) {
         PublishRequest request = new PublishRequest();
-
-        request.setTopicArn(getConfiguration().getTopicArn());
-        request.setSubject(determineSubject(exchange));
-        request.setMessageStructure(determineMessageStructure(exchange));
-        request.setMessage(exchange.getIn().getBody(String.class));
-        
request.setMessageAttributes(this.translateAttributes(exchange.getIn().getHeaders(),
 exchange));
+        try {
+            request.setTopicArn(getConfiguration().getTopicArn());
+            request.setSubject(determineSubject(exchange));
+            request.setMessageStructure(determineMessageStructure(exchange));
+            request.setMessage(exchange.getIn().getBody(String.class));
+            
request.setMessageAttributes(this.translateAttributes(exchange.getIn().getHeaders(),
 exchange));
+        } catch (Exception e) {
+            exchange.setException(e);
+            callback.done(true);
+            return true;
+        }
 
         log.trace("Sending request [{}] from exchange [{}]...", request, 
exchange);
+        getEndpoint().getSNSClient().publishAsync(request, new 
AsyncHandler<PublishRequest, PublishResult>() {
+            @Override
+            public void onError(Exception e) {
+                log.trace("Received error", e);
+                exchange.setException(e);
+                callback.done(false);
+            }
 
-        PublishResult result = getEndpoint().getSNSClient().publish(request);
-
-        log.trace("Received result [{}]", result);
+            @Override
+            public void onSuccess(PublishRequest request, PublishResult 
result) {
+                log.trace("Received result [{}]", result);
+                exchange.getMessage().setHeader(SnsConstants.MESSAGE_ID, 
result.getMessageId());
+                callback.done(false);
+            }
+        });
 
-        Message message = getMessageForResponse(exchange);
-        message.setHeader(SnsConstants.MESSAGE_ID, result.getMessageId());
+        return false;
     }
 
     private String determineSubject(Exchange exchange) {
@@ -130,12 +146,4 @@ public class SnsProducer extends DefaultProducer {
         return (SnsEndpoint) super.getEndpoint();
     }
 
-    public static Message getMessageForResponse(final Exchange exchange) {
-        if (exchange.getPattern().isOutCapable()) {
-            Message out = exchange.getOut();
-            out.copyFrom(exchange.getIn());
-            return out;
-        }
-        return exchange.getIn();
-    }
 }
\ No newline at end of file

Reply via email to