This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 5348a9600fcd1fb964dc390f60cc258c5505f8a6
Author: Andrea Cosentino <anco...@gmail.com>
AuthorDate: Tue Oct 16 13:49:32 2018 +0200

    CAMEL-12884 - Camel-AWS Lambda: Add support for event source mapping
---
 .../component/aws/lambda/LambdaConstants.java      |  3 +-
 .../component/aws/lambda/LambdaOperations.java     |  3 +-
 .../camel/component/aws/lambda/LambdaProducer.java | 57 ++++++++++++++++++----
 .../aws/lambda/AmazonLambdaClientMock.java         |  7 ++-
 .../aws/lambda/LambdaComponentSpringTest.java      | 17 +++++++
 .../component/aws/lambda/LambdaProducerTest.java   | 20 ++++++++
 .../lambda/LambdaComponentSpringTest-context.xml   |  5 +-
 7 files changed, 98 insertions(+), 14 deletions(-)

diff --git 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/lambda/LambdaConstants.java
 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/lambda/LambdaConstants.java
index 95ced62..ce64f5d 100644
--- 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/lambda/LambdaConstants.java
+++ 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/lambda/LambdaConstants.java
@@ -43,5 +43,6 @@ public interface LambdaConstants {
     String SDK_REQUEST_TIMEOUT = "CamelAwsLambdaSdkRequestTimeout";
     String SECURITY_GROUP_IDS = "CamelAwsLambdaSecurityGroupIds";
     String SUBNET_IDS = "CamelAwsLambdaSubnetIds";
-
+    String EVENT_SOURCE_ARN = "CamelAwsLambdaEventSourceArn";
+    String EVENT_SOURCE_BATCH_SIZE = "CamelAwsLambdaEventSourceBatchSize";
 }
\ No newline at end of file
diff --git 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/lambda/LambdaOperations.java
 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/lambda/LambdaOperations.java
index 1d24bfe..23a9491 100644
--- 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/lambda/LambdaOperations.java
+++ 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/lambda/LambdaOperations.java
@@ -23,5 +23,6 @@ public enum LambdaOperations {
     createFunction,
     deleteFunction,
     invokeFunction,
-    updateFunction
+    updateFunction,
+    createEventSourceMapping
 }
diff --git 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/lambda/LambdaProducer.java
 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/lambda/LambdaProducer.java
index 2fcb27c..d52a7c3 100644
--- 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/lambda/LambdaProducer.java
+++ 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/lambda/LambdaProducer.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.component.aws.lambda;
 
+import static 
org.apache.camel.component.aws.common.AwsExchangeUtil.getMessageForResponse;
+
 import java.io.File;
 import java.io.FileInputStream;
 import java.nio.ByteBuffer;
@@ -23,8 +25,19 @@ import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.util.CastUtils;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.amazonaws.AmazonServiceException;
 import com.amazonaws.services.lambda.AWSLambda;
+import com.amazonaws.services.lambda.model.CreateEventSourceMappingRequest;
+import com.amazonaws.services.lambda.model.CreateEventSourceMappingResult;
 import com.amazonaws.services.lambda.model.CreateFunctionRequest;
 import com.amazonaws.services.lambda.model.CreateFunctionResult;
 import com.amazonaws.services.lambda.model.DeadLetterConfig;
@@ -42,16 +55,6 @@ import 
com.amazonaws.services.lambda.model.UpdateFunctionCodeRequest;
 import com.amazonaws.services.lambda.model.UpdateFunctionCodeResult;
 import com.amazonaws.services.lambda.model.VpcConfig;
 import com.amazonaws.util.IOUtils;
-import org.apache.camel.Endpoint;
-import org.apache.camel.Exchange;
-import org.apache.camel.Message;
-import org.apache.camel.impl.DefaultProducer;
-import org.apache.camel.util.CastUtils;
-import org.apache.camel.util.ObjectHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static 
org.apache.camel.component.aws.common.AwsExchangeUtil.getMessageForResponse;
 
 /**
  * A Producer which sends messages to the Amazon Web Service Lambda <a
@@ -86,6 +89,9 @@ public class LambdaProducer extends DefaultProducer {
         case updateFunction:
             updateFunction(getEndpoint().getAwsLambdaClient(), exchange);
             break;
+        case createEventSourceMapping:
+            createEventSourceMapping(getEndpoint().getAwsLambdaClient(), 
exchange);
+            break;
         default:
             throw new IllegalArgumentException("Unsupported operation");
         }
@@ -341,6 +347,37 @@ public class LambdaProducer extends DefaultProducer {
         message.setBody(result);
     }
     
+    private void createEventSourceMapping(AWSLambda lambdaClient, Exchange 
exchange) {
+        CreateEventSourceMappingResult result;
+        try {
+            CreateEventSourceMappingRequest request = new 
CreateEventSourceMappingRequest().withFunctionName(getConfiguration().getFunction());
+            if 
(ObjectHelper.isNotEmpty(exchange.getIn().getHeader(LambdaConstants.EVENT_SOURCE_ARN)))
 {
+                
request.withEventSourceArn(exchange.getIn().getHeader(LambdaConstants.EVENT_SOURCE_ARN,
 String.class));
+            } else {
+                throw new IllegalArgumentException("Event Source Arn must be 
specified");
+            }
+            if 
(ObjectHelper.isNotEmpty(exchange.getIn().getHeader(LambdaConstants.EVENT_SOURCE_BATCH_SIZE)))
 {
+                Integer batchSize = 
exchange.getIn().getHeader(LambdaConstants.EVENT_SOURCE_BATCH_SIZE, 
Integer.class);
+                request.withBatchSize(batchSize);
+            }
+            if 
(ObjectHelper.isNotEmpty(exchange.getIn().getHeader(LambdaConstants.SDK_CLIENT_EXECUTION_TIMEOUT)))
 {
+                Integer timeout = 
exchange.getIn().getHeader(LambdaConstants.SDK_CLIENT_EXECUTION_TIMEOUT, 
Integer.class);
+                request.withSdkClientExecutionTimeout(timeout);
+            }
+
+            if 
(ObjectHelper.isNotEmpty(exchange.getIn().getHeader(LambdaConstants.SDK_REQUEST_TIMEOUT)))
 {
+                Integer timeout = 
exchange.getIn().getHeader(LambdaConstants.SDK_REQUEST_TIMEOUT, Integer.class);
+                request.withSdkRequestTimeout(timeout);
+            }
+            result = lambdaClient.createEventSourceMapping(request);
+        } catch (AmazonServiceException ase) {
+            LOG.trace("createEventSourceMapping command returned the error 
code {}", ase.getErrorCode());
+            throw ase;
+        }
+        Message message = getMessageForResponse(exchange);
+        message.setBody(result);
+    }
+    
     private LambdaOperations determineOperation(Exchange exchange) {
         LambdaOperations operation = 
exchange.getIn().getHeader(LambdaConstants.OPERATION, LambdaOperations.class);
         if (operation == null) {
diff --git 
a/components/camel-aws/src/test/java/org/apache/camel/component/aws/lambda/AmazonLambdaClientMock.java
 
b/components/camel-aws/src/test/java/org/apache/camel/component/aws/lambda/AmazonLambdaClientMock.java
index 8c4ab2b..8e6ed76 100644
--- 
a/components/camel-aws/src/test/java/org/apache/camel/component/aws/lambda/AmazonLambdaClientMock.java
+++ 
b/components/camel-aws/src/test/java/org/apache/camel/component/aws/lambda/AmazonLambdaClientMock.java
@@ -111,7 +111,12 @@ public class AmazonLambdaClientMock extends 
AbstractAWSLambda {
 
     @Override
     public CreateEventSourceMappingResult 
createEventSourceMapping(CreateEventSourceMappingRequest 
createEventSourceMappingRequest) {
-        throw new UnsupportedOperationException();
+        CreateEventSourceMappingResult result = new 
CreateEventSourceMappingResult();
+        result.setBatchSize(100);
+        
result.setFunctionArn("arn:aws:lambda:eu-central-1:643534317684:function:" + 
createEventSourceMappingRequest.getFunctionName());
+        result.setState("Enabled");
+        
result.setEventSourceArn("arn:aws:sqs:eu-central-1:643534317684:testqueue");
+        return result;
     }
 
     @Override
diff --git 
a/components/camel-aws/src/test/java/org/apache/camel/component/aws/lambda/LambdaComponentSpringTest.java
 
b/components/camel-aws/src/test/java/org/apache/camel/component/aws/lambda/LambdaComponentSpringTest.java
index 400bd02..2c9f9fa 100644
--- 
a/components/camel-aws/src/test/java/org/apache/camel/component/aws/lambda/LambdaComponentSpringTest.java
+++ 
b/components/camel-aws/src/test/java/org/apache/camel/component/aws/lambda/LambdaComponentSpringTest.java
@@ -17,6 +17,8 @@
 package org.apache.camel.component.aws.lambda;
 
 import java.io.*;
+
+import com.amazonaws.services.lambda.model.CreateEventSourceMappingResult;
 import com.amazonaws.services.lambda.model.CreateFunctionResult;
 import com.amazonaws.services.lambda.model.DeleteFunctionResult;
 import com.amazonaws.services.lambda.model.GetFunctionResult;
@@ -111,6 +113,21 @@ public class LambdaComponentSpringTest extends 
CamelSpringTestSupport {
         assertNotNull(exchange.getOut().getBody(String.class));
         assertEquals(exchange.getOut().getBody(String.class), 
"{\"Hello\":\"Camel\"}");
     }
+    
+    @Test
+    public void lambdaCreateEventSourceMappingTest() throws Exception {
+        Exchange exchange = template.send("direct:createEventSourceMapping", 
ExchangePattern.InOut, new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setHeader(LambdaConstants.EVENT_SOURCE_ARN, 
"arn:aws:sqs:eu-central-1:643534317684:testqueue");
+                
exchange.getIn().setHeader(LambdaConstants.EVENT_SOURCE_BATCH_SIZE, 100);
+            }
+        });
+        assertMockEndpointsSatisfied();
+
+        CreateEventSourceMappingResult result = 
exchange.getOut().getBody(CreateEventSourceMappingResult.class);
+        assertEquals(result.getFunctionArn(), 
"arn:aws:lambda:eu-central-1:643534317684:function:GetHelloWithName");
+    }
 
 
     @Override
diff --git 
a/components/camel-aws/src/test/java/org/apache/camel/component/aws/lambda/LambdaProducerTest.java
 
b/components/camel-aws/src/test/java/org/apache/camel/component/aws/lambda/LambdaProducerTest.java
index 006358e..590ad59 100644
--- 
a/components/camel-aws/src/test/java/org/apache/camel/component/aws/lambda/LambdaProducerTest.java
+++ 
b/components/camel-aws/src/test/java/org/apache/camel/component/aws/lambda/LambdaProducerTest.java
@@ -17,6 +17,8 @@
 package org.apache.camel.component.aws.lambda;
 
 import java.io.*;
+
+import com.amazonaws.services.lambda.model.CreateEventSourceMappingResult;
 import com.amazonaws.services.lambda.model.CreateFunctionResult;
 import com.amazonaws.services.lambda.model.DeleteFunctionResult;
 import com.amazonaws.services.lambda.model.GetFunctionResult;
@@ -152,6 +154,21 @@ public class LambdaProducerTest extends CamelTestSupport {
         assertNotNull(exchange.getOut().getBody(String.class));
         assertEquals(exchange.getOut().getBody(String.class), 
"{\"Hello\":\"Camel\"}");
     }
+    
+    @Test
+    public void lambdaCreateEventSourceMappingTest() throws Exception {
+        Exchange exchange = template.send("direct:createEventSourceMapping", 
ExchangePattern.InOut, new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setHeader(LambdaConstants.EVENT_SOURCE_ARN, 
"arn:aws:sqs:eu-central-1:643534317684:testqueue");
+                
exchange.getIn().setHeader(LambdaConstants.EVENT_SOURCE_BATCH_SIZE, 100);
+            }
+        });
+        assertMockEndpointsSatisfied();
+
+        CreateEventSourceMappingResult result = 
exchange.getOut().getBody(CreateEventSourceMappingResult.class);
+        assertEquals(result.getFunctionArn(), 
"arn:aws:lambda:eu-central-1:643534317684:function:GetHelloWithName");
+    }
 
 
     @Override
@@ -194,6 +211,9 @@ public class LambdaProducerTest extends CamelTestSupport {
                     
.to("aws-lambda://GetHelloWithName?awsLambdaClient=#awsLambdaClient&operation=updateFunction")
                     .to("mock:result");
 
+                from("direct:createEventSourceMapping")
+                
.to("aws-lambda://GetHelloWithName?awsLambdaClient=#awsLambdaClient&operation=createEventSourceMapping")
+                .to("mock:result");
             }
         };
     }
diff --git 
a/components/camel-aws/src/test/resources/org/apache/camel/component/aws/lambda/LambdaComponentSpringTest-context.xml
 
b/components/camel-aws/src/test/resources/org/apache/camel/component/aws/lambda/LambdaComponentSpringTest-context.xml
index 413404a..c3211dd 100644
--- 
a/components/camel-aws/src/test/resources/org/apache/camel/component/aws/lambda/LambdaComponentSpringTest-context.xml
+++ 
b/components/camel-aws/src/test/resources/org/apache/camel/component/aws/lambda/LambdaComponentSpringTest-context.xml
@@ -50,7 +50,10 @@
             <to 
uri="aws-lambda://GetHelloWithName?awsLambdaClient=#awsLambdaClient&amp;operation=deleteFunction"/>
         </route>
 
-
+        <route>
+            <from uri="direct:createEventSourceMapping"/>
+            <to 
uri="aws-lambda://GetHelloWithName?awsLambdaClient=#awsLambdaClient&amp;operation=createEventSourceMapping"/>
+        </route>
     </camelContext>
 
     <bean id="awsLambdaClient" 
class="org.apache.camel.component.aws.lambda.AmazonLambdaClientMock"/>

Reply via email to