This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch async-aws-sns in repository https://gitbox.apache.org/repos/asf/camel.git
commit 9605406b986b57edfd7415bd569654fab34a11d3 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Tue Jul 2 08:38:09 2019 +0200 https://stackoverflow.com/questions/56775883/camel-sns-how-to-use-async-client-without-blocking?noredirect=1#comment100229953_56775883. Work in progress --- .../camel/component/aws/sns/SnsComponent.java | 3 +- .../camel/component/aws/sns/SnsConfiguration.java | 9 ++-- .../camel/component/aws/sns/SnsEndpoint.java | 23 +++++---- .../camel/component/aws/sns/SnsProducer.java | 56 ++++++++++++---------- 4 files changed, 53 insertions(+), 38 deletions(-) diff --git a/components/camel-aws-sns/src/main/java/org/apache/camel/component/aws/sns/SnsComponent.java b/components/camel-aws-sns/src/main/java/org/apache/camel/component/aws/sns/SnsComponent.java index aaa1bcd..2a488b3 100644 --- a/components/camel-aws-sns/src/main/java/org/apache/camel/component/aws/sns/SnsComponent.java +++ b/components/camel-aws-sns/src/main/java/org/apache/camel/component/aws/sns/SnsComponent.java @@ -22,6 +22,7 @@ import java.util.Set; import com.amazonaws.regions.Regions; import com.amazonaws.services.sns.AmazonSNS; +import com.amazonaws.services.sns.AmazonSNSAsync; import org.apache.camel.CamelContext; import org.apache.camel.Endpoint; import org.apache.camel.spi.Metadata; @@ -133,7 +134,7 @@ public class SnsComponent extends DefaultComponent { } private void checkAndSetRegistryClient(SnsConfiguration configuration) { - Set<AmazonSNS> clients = getCamelContext().getRegistry().findByType(AmazonSNS.class); + Set<AmazonSNSAsync> clients = getCamelContext().getRegistry().findByType(AmazonSNSAsync.class); if (clients.size() == 1) { configuration.setAmazonSNSClient(clients.stream().findFirst().get()); } diff --git a/components/camel-aws-sns/src/main/java/org/apache/camel/component/aws/sns/SnsConfiguration.java b/components/camel-aws-sns/src/main/java/org/apache/camel/component/aws/sns/SnsConfiguration.java index 7690de8..a6bdc53 100644 --- a/components/camel-aws-sns/src/main/java/org/apache/camel/component/aws/sns/SnsConfiguration.java +++ b/components/camel-aws-sns/src/main/java/org/apache/camel/component/aws/sns/SnsConfiguration.java @@ -17,6 +17,9 @@ package org.apache.camel.component.aws.sns; import com.amazonaws.services.sns.AmazonSNS; +import com.amazonaws.services.sns.AmazonSNSAsync; +import com.amazonaws.services.sns.AmazonSNSAsyncClient; +import com.amazonaws.services.sns.AmazonSNSClient; import com.amazonaws.services.sqs.AmazonSQS; import org.apache.camel.RuntimeCamelException; @@ -31,7 +34,7 @@ public class SnsConfiguration implements Cloneable { // Common properties private String topicName; @UriParam - private AmazonSNS amazonSNSClient; + private AmazonSNSAsync amazonSNSClient; @UriParam(label = "security", secret = true) private String accessKey; @UriParam(label = "security", secret = true) @@ -108,14 +111,14 @@ public class SnsConfiguration implements Cloneable { this.secretKey = secretKey; } - public AmazonSNS getAmazonSNSClient() { + public AmazonSNSAsync getAmazonSNSClient() { return amazonSNSClient; } /** * To use the AmazonSNS as the client */ - public void setAmazonSNSClient(AmazonSNS amazonSNSClient) { + public void setAmazonSNSClient(AmazonSNSAsync amazonSNSClient) { this.amazonSNSClient = amazonSNSClient; } diff --git a/components/camel-aws-sns/src/main/java/org/apache/camel/component/aws/sns/SnsEndpoint.java b/components/camel-aws-sns/src/main/java/org/apache/camel/component/aws/sns/SnsEndpoint.java index 64636b4..b6e750b 100644 --- a/components/camel-aws-sns/src/main/java/org/apache/camel/component/aws/sns/SnsEndpoint.java +++ b/components/camel-aws-sns/src/main/java/org/apache/camel/component/aws/sns/SnsEndpoint.java @@ -27,6 +27,9 @@ import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.regions.Regions; import com.amazonaws.services.sns.AmazonSNS; +import com.amazonaws.services.sns.AmazonSNSAsync; +import com.amazonaws.services.sns.AmazonSNSAsyncClient; +import com.amazonaws.services.sns.AmazonSNSAsyncClientBuilder; import com.amazonaws.services.sns.AmazonSNSClientBuilder; import com.amazonaws.services.sns.model.CreateTopicRequest; import com.amazonaws.services.sns.model.CreateTopicResult; @@ -55,7 +58,7 @@ import org.apache.camel.util.ObjectHelper; producerOnly = true, label = "cloud,mobile,messaging") public class SnsEndpoint extends DefaultEndpoint implements HeaderFilterStrategyAware { - private AmazonSNS snsClient; + private AmazonSNSAsync snsClient; @UriPath(description = "Topic name or ARN") @Metadata(required = true) @@ -178,11 +181,11 @@ public class SnsEndpoint extends DefaultEndpoint implements HeaderFilterStrategy this.configuration = configuration; } - public void setSNSClient(AmazonSNS snsClient) { + public void setSNSClient(AmazonSNSAsync snsClient) { this.snsClient = snsClient; } - public AmazonSNS getSNSClient() { + public AmazonSNSAsync getSNSClient() { return snsClient; } @@ -191,9 +194,9 @@ public class SnsEndpoint extends DefaultEndpoint implements HeaderFilterStrategy * * @return AmazonSNSClient */ - AmazonSNS createSNSClient() { - AmazonSNS client = null; - AmazonSNSClientBuilder clientBuilder = null; + AmazonSNSAsync createSNSClient() { + AmazonSNSAsync client; + AmazonSNSAsyncClientBuilder clientBuilder; ClientConfiguration clientConfiguration = null; boolean isClientConfigFound = false; if (ObjectHelper.isNotEmpty(configuration.getProxyHost()) && ObjectHelper.isNotEmpty(configuration.getProxyPort())) { @@ -206,15 +209,15 @@ public class SnsEndpoint extends DefaultEndpoint implements HeaderFilterStrategy AWSCredentials credentials = new BasicAWSCredentials(configuration.getAccessKey(), configuration.getSecretKey()); AWSCredentialsProvider credentialsProvider = new AWSStaticCredentialsProvider(credentials); if (isClientConfigFound) { - clientBuilder = AmazonSNSClientBuilder.standard().withClientConfiguration(clientConfiguration).withCredentials(credentialsProvider); + clientBuilder = AmazonSNSAsyncClientBuilder.standard().withClientConfiguration(clientConfiguration).withCredentials(credentialsProvider); } else { - clientBuilder = AmazonSNSClientBuilder.standard().withCredentials(credentialsProvider); + clientBuilder = AmazonSNSAsyncClientBuilder.standard().withCredentials(credentialsProvider); } } else { if (isClientConfigFound) { - clientBuilder = AmazonSNSClientBuilder.standard(); + clientBuilder = AmazonSNSAsyncClientBuilder.standard(); } else { - clientBuilder = AmazonSNSClientBuilder.standard().withClientConfiguration(clientConfiguration); + clientBuilder = AmazonSNSAsyncClientBuilder.standard().withClientConfiguration(clientConfiguration); } } if (ObjectHelper.isNotEmpty(configuration.getRegion())) { diff --git a/components/camel-aws-sns/src/main/java/org/apache/camel/component/aws/sns/SnsProducer.java b/components/camel-aws-sns/src/main/java/org/apache/camel/component/aws/sns/SnsProducer.java index 7eaea77..ac75ffc 100644 --- a/components/camel-aws-sns/src/main/java/org/apache/camel/component/aws/sns/SnsProducer.java +++ b/components/camel-aws-sns/src/main/java/org/apache/camel/component/aws/sns/SnsProducer.java @@ -22,22 +22,22 @@ import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; +import com.amazonaws.handlers.AsyncHandler; import com.amazonaws.services.sns.model.MessageAttributeValue; import com.amazonaws.services.sns.model.PublishRequest; import com.amazonaws.services.sns.model.PublishResult; +import org.apache.camel.AsyncCallback; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; -import org.apache.camel.Message; import org.apache.camel.spi.HeaderFilterStrategy; -import org.apache.camel.support.DefaultProducer; +import org.apache.camel.support.DefaultAsyncProducer; import org.apache.camel.util.URISupport; - /** * A Producer which sends messages to the Amazon Web Service Simple Notification Service * <a href="http://aws.amazon.com/sns/">AWS SNS</a> */ -public class SnsProducer extends DefaultProducer { +public class SnsProducer extends DefaultAsyncProducer { private transient String snsProducerToString; @@ -45,23 +45,39 @@ public class SnsProducer extends DefaultProducer { super(endpoint); } - public void process(Exchange exchange) throws Exception { + @Override + public boolean process(final Exchange exchange, final AsyncCallback callback) { PublishRequest request = new PublishRequest(); - - request.setTopicArn(getConfiguration().getTopicArn()); - request.setSubject(determineSubject(exchange)); - request.setMessageStructure(determineMessageStructure(exchange)); - request.setMessage(exchange.getIn().getBody(String.class)); - request.setMessageAttributes(this.translateAttributes(exchange.getIn().getHeaders(), exchange)); + try { + request.setTopicArn(getConfiguration().getTopicArn()); + request.setSubject(determineSubject(exchange)); + request.setMessageStructure(determineMessageStructure(exchange)); + request.setMessage(exchange.getIn().getBody(String.class)); + request.setMessageAttributes(this.translateAttributes(exchange.getIn().getHeaders(), exchange)); + } catch (Exception e) { + exchange.setException(e); + callback.done(true); + return true; + } log.trace("Sending request [{}] from exchange [{}]...", request, exchange); + getEndpoint().getSNSClient().publishAsync(request, new AsyncHandler<PublishRequest, PublishResult>() { + @Override + public void onError(Exception e) { + log.trace("Received error", e); + exchange.setException(e); + callback.done(false); + } - PublishResult result = getEndpoint().getSNSClient().publish(request); - - log.trace("Received result [{}]", result); + @Override + public void onSuccess(PublishRequest request, PublishResult result) { + log.trace("Received result [{}]", result); + exchange.getMessage().setHeader(SnsConstants.MESSAGE_ID, result.getMessageId()); + callback.done(false); + } + }); - Message message = getMessageForResponse(exchange); - message.setHeader(SnsConstants.MESSAGE_ID, result.getMessageId()); + return false; } private String determineSubject(Exchange exchange) { @@ -130,12 +146,4 @@ public class SnsProducer extends DefaultProducer { return (SnsEndpoint) super.getEndpoint(); } - public static Message getMessageForResponse(final Exchange exchange) { - if (exchange.getPattern().isOutCapable()) { - Message out = exchange.getOut(); - out.copyFrom(exchange.getIn()); - return out; - } - return exchange.getIn(); - } } \ No newline at end of file