This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 5348a9600fcd1fb964dc390f60cc258c5505f8a6 Author: Andrea Cosentino <anco...@gmail.com> AuthorDate: Tue Oct 16 13:49:32 2018 +0200 CAMEL-12884 - Camel-AWS Lambda: Add support for event source mapping --- .../component/aws/lambda/LambdaConstants.java | 3 +- .../component/aws/lambda/LambdaOperations.java | 3 +- .../camel/component/aws/lambda/LambdaProducer.java | 57 ++++++++++++++++++---- .../aws/lambda/AmazonLambdaClientMock.java | 7 ++- .../aws/lambda/LambdaComponentSpringTest.java | 17 +++++++ .../component/aws/lambda/LambdaProducerTest.java | 20 ++++++++ .../lambda/LambdaComponentSpringTest-context.xml | 5 +- 7 files changed, 98 insertions(+), 14 deletions(-) diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/lambda/LambdaConstants.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/lambda/LambdaConstants.java index 95ced62..ce64f5d 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/lambda/LambdaConstants.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/lambda/LambdaConstants.java @@ -43,5 +43,6 @@ public interface LambdaConstants { String SDK_REQUEST_TIMEOUT = "CamelAwsLambdaSdkRequestTimeout"; String SECURITY_GROUP_IDS = "CamelAwsLambdaSecurityGroupIds"; String SUBNET_IDS = "CamelAwsLambdaSubnetIds"; - + String EVENT_SOURCE_ARN = "CamelAwsLambdaEventSourceArn"; + String EVENT_SOURCE_BATCH_SIZE = "CamelAwsLambdaEventSourceBatchSize"; } \ No newline at end of file diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/lambda/LambdaOperations.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/lambda/LambdaOperations.java index 1d24bfe..23a9491 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/lambda/LambdaOperations.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/lambda/LambdaOperations.java @@ -23,5 +23,6 @@ public enum LambdaOperations { createFunction, deleteFunction, invokeFunction, - updateFunction + updateFunction, + createEventSourceMapping } diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/lambda/LambdaProducer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/lambda/LambdaProducer.java index 2fcb27c..d52a7c3 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/lambda/LambdaProducer.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/lambda/LambdaProducer.java @@ -16,6 +16,8 @@ */ package org.apache.camel.component.aws.lambda; +import static org.apache.camel.component.aws.common.AwsExchangeUtil.getMessageForResponse; + import java.io.File; import java.io.FileInputStream; import java.nio.ByteBuffer; @@ -23,8 +25,19 @@ import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Map; +import org.apache.camel.Endpoint; +import org.apache.camel.Exchange; +import org.apache.camel.Message; +import org.apache.camel.impl.DefaultProducer; +import org.apache.camel.util.CastUtils; +import org.apache.camel.util.ObjectHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.amazonaws.AmazonServiceException; import com.amazonaws.services.lambda.AWSLambda; +import com.amazonaws.services.lambda.model.CreateEventSourceMappingRequest; +import com.amazonaws.services.lambda.model.CreateEventSourceMappingResult; import com.amazonaws.services.lambda.model.CreateFunctionRequest; import com.amazonaws.services.lambda.model.CreateFunctionResult; import com.amazonaws.services.lambda.model.DeadLetterConfig; @@ -42,16 +55,6 @@ import com.amazonaws.services.lambda.model.UpdateFunctionCodeRequest; import com.amazonaws.services.lambda.model.UpdateFunctionCodeResult; import com.amazonaws.services.lambda.model.VpcConfig; import com.amazonaws.util.IOUtils; -import org.apache.camel.Endpoint; -import org.apache.camel.Exchange; -import org.apache.camel.Message; -import org.apache.camel.impl.DefaultProducer; -import org.apache.camel.util.CastUtils; -import org.apache.camel.util.ObjectHelper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static org.apache.camel.component.aws.common.AwsExchangeUtil.getMessageForResponse; /** * A Producer which sends messages to the Amazon Web Service Lambda <a @@ -86,6 +89,9 @@ public class LambdaProducer extends DefaultProducer { case updateFunction: updateFunction(getEndpoint().getAwsLambdaClient(), exchange); break; + case createEventSourceMapping: + createEventSourceMapping(getEndpoint().getAwsLambdaClient(), exchange); + break; default: throw new IllegalArgumentException("Unsupported operation"); } @@ -341,6 +347,37 @@ public class LambdaProducer extends DefaultProducer { message.setBody(result); } + private void createEventSourceMapping(AWSLambda lambdaClient, Exchange exchange) { + CreateEventSourceMappingResult result; + try { + CreateEventSourceMappingRequest request = new CreateEventSourceMappingRequest().withFunctionName(getConfiguration().getFunction()); + if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(LambdaConstants.EVENT_SOURCE_ARN))) { + request.withEventSourceArn(exchange.getIn().getHeader(LambdaConstants.EVENT_SOURCE_ARN, String.class)); + } else { + throw new IllegalArgumentException("Event Source Arn must be specified"); + } + if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(LambdaConstants.EVENT_SOURCE_BATCH_SIZE))) { + Integer batchSize = exchange.getIn().getHeader(LambdaConstants.EVENT_SOURCE_BATCH_SIZE, Integer.class); + request.withBatchSize(batchSize); + } + if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(LambdaConstants.SDK_CLIENT_EXECUTION_TIMEOUT))) { + Integer timeout = exchange.getIn().getHeader(LambdaConstants.SDK_CLIENT_EXECUTION_TIMEOUT, Integer.class); + request.withSdkClientExecutionTimeout(timeout); + } + + if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(LambdaConstants.SDK_REQUEST_TIMEOUT))) { + Integer timeout = exchange.getIn().getHeader(LambdaConstants.SDK_REQUEST_TIMEOUT, Integer.class); + request.withSdkRequestTimeout(timeout); + } + result = lambdaClient.createEventSourceMapping(request); + } catch (AmazonServiceException ase) { + LOG.trace("createEventSourceMapping command returned the error code {}", ase.getErrorCode()); + throw ase; + } + Message message = getMessageForResponse(exchange); + message.setBody(result); + } + private LambdaOperations determineOperation(Exchange exchange) { LambdaOperations operation = exchange.getIn().getHeader(LambdaConstants.OPERATION, LambdaOperations.class); if (operation == null) { diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/lambda/AmazonLambdaClientMock.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/lambda/AmazonLambdaClientMock.java index 8c4ab2b..8e6ed76 100644 --- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/lambda/AmazonLambdaClientMock.java +++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/lambda/AmazonLambdaClientMock.java @@ -111,7 +111,12 @@ public class AmazonLambdaClientMock extends AbstractAWSLambda { @Override public CreateEventSourceMappingResult createEventSourceMapping(CreateEventSourceMappingRequest createEventSourceMappingRequest) { - throw new UnsupportedOperationException(); + CreateEventSourceMappingResult result = new CreateEventSourceMappingResult(); + result.setBatchSize(100); + result.setFunctionArn("arn:aws:lambda:eu-central-1:643534317684:function:" + createEventSourceMappingRequest.getFunctionName()); + result.setState("Enabled"); + result.setEventSourceArn("arn:aws:sqs:eu-central-1:643534317684:testqueue"); + return result; } @Override diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/lambda/LambdaComponentSpringTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/lambda/LambdaComponentSpringTest.java index 400bd02..2c9f9fa 100644 --- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/lambda/LambdaComponentSpringTest.java +++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/lambda/LambdaComponentSpringTest.java @@ -17,6 +17,8 @@ package org.apache.camel.component.aws.lambda; import java.io.*; + +import com.amazonaws.services.lambda.model.CreateEventSourceMappingResult; import com.amazonaws.services.lambda.model.CreateFunctionResult; import com.amazonaws.services.lambda.model.DeleteFunctionResult; import com.amazonaws.services.lambda.model.GetFunctionResult; @@ -111,6 +113,21 @@ public class LambdaComponentSpringTest extends CamelSpringTestSupport { assertNotNull(exchange.getOut().getBody(String.class)); assertEquals(exchange.getOut().getBody(String.class), "{\"Hello\":\"Camel\"}"); } + + @Test + public void lambdaCreateEventSourceMappingTest() throws Exception { + Exchange exchange = template.send("direct:createEventSourceMapping", ExchangePattern.InOut, new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setHeader(LambdaConstants.EVENT_SOURCE_ARN, "arn:aws:sqs:eu-central-1:643534317684:testqueue"); + exchange.getIn().setHeader(LambdaConstants.EVENT_SOURCE_BATCH_SIZE, 100); + } + }); + assertMockEndpointsSatisfied(); + + CreateEventSourceMappingResult result = exchange.getOut().getBody(CreateEventSourceMappingResult.class); + assertEquals(result.getFunctionArn(), "arn:aws:lambda:eu-central-1:643534317684:function:GetHelloWithName"); + } @Override diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/lambda/LambdaProducerTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/lambda/LambdaProducerTest.java index 006358e..590ad59 100644 --- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/lambda/LambdaProducerTest.java +++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/lambda/LambdaProducerTest.java @@ -17,6 +17,8 @@ package org.apache.camel.component.aws.lambda; import java.io.*; + +import com.amazonaws.services.lambda.model.CreateEventSourceMappingResult; import com.amazonaws.services.lambda.model.CreateFunctionResult; import com.amazonaws.services.lambda.model.DeleteFunctionResult; import com.amazonaws.services.lambda.model.GetFunctionResult; @@ -152,6 +154,21 @@ public class LambdaProducerTest extends CamelTestSupport { assertNotNull(exchange.getOut().getBody(String.class)); assertEquals(exchange.getOut().getBody(String.class), "{\"Hello\":\"Camel\"}"); } + + @Test + public void lambdaCreateEventSourceMappingTest() throws Exception { + Exchange exchange = template.send("direct:createEventSourceMapping", ExchangePattern.InOut, new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setHeader(LambdaConstants.EVENT_SOURCE_ARN, "arn:aws:sqs:eu-central-1:643534317684:testqueue"); + exchange.getIn().setHeader(LambdaConstants.EVENT_SOURCE_BATCH_SIZE, 100); + } + }); + assertMockEndpointsSatisfied(); + + CreateEventSourceMappingResult result = exchange.getOut().getBody(CreateEventSourceMappingResult.class); + assertEquals(result.getFunctionArn(), "arn:aws:lambda:eu-central-1:643534317684:function:GetHelloWithName"); + } @Override @@ -194,6 +211,9 @@ public class LambdaProducerTest extends CamelTestSupport { .to("aws-lambda://GetHelloWithName?awsLambdaClient=#awsLambdaClient&operation=updateFunction") .to("mock:result"); + from("direct:createEventSourceMapping") + .to("aws-lambda://GetHelloWithName?awsLambdaClient=#awsLambdaClient&operation=createEventSourceMapping") + .to("mock:result"); } }; } diff --git a/components/camel-aws/src/test/resources/org/apache/camel/component/aws/lambda/LambdaComponentSpringTest-context.xml b/components/camel-aws/src/test/resources/org/apache/camel/component/aws/lambda/LambdaComponentSpringTest-context.xml index 413404a..c3211dd 100644 --- a/components/camel-aws/src/test/resources/org/apache/camel/component/aws/lambda/LambdaComponentSpringTest-context.xml +++ b/components/camel-aws/src/test/resources/org/apache/camel/component/aws/lambda/LambdaComponentSpringTest-context.xml @@ -50,7 +50,10 @@ <to uri="aws-lambda://GetHelloWithName?awsLambdaClient=#awsLambdaClient&operation=deleteFunction"/> </route> - + <route> + <from uri="direct:createEventSourceMapping"/> + <to uri="aws-lambda://GetHelloWithName?awsLambdaClient=#awsLambdaClient&operation=createEventSourceMapping"/> + </route> </camelContext> <bean id="awsLambdaClient" class="org.apache.camel.component.aws.lambda.AmazonLambdaClientMock"/>