This is an automated email from the ASF dual-hosted git repository.

jamesnetherton pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git

commit 77072a23a8a82231095493d1995d24c39ecc17f4
Author: Peter Palaga <[email protected]>
AuthorDate: Fri Feb 12 11:30:50 2021 +0100

    Test AWS 2 DynamoDB Streams
---
 .../aws2/ddb/it/Aws2DdbStreamResource.java         | 75 ++++++++++++++++++++++
 .../src/main/resources/application.properties      |  4 ++
 .../quarkus/component/aws2/ddb/it/Aws2DdbTest.java | 18 ++++++
 .../aws2/ddb/it/Aws2DdbTestEnvCustomizer.java      | 11 +++-
 .../test/support/aws2/Aws2TestEnvContext.java      |  3 +
 5 files changed, 108 insertions(+), 3 deletions(-)

diff --git 
a/integration-tests-aws2/aws2-ddb/src/main/java/org/apache/camel/quarkus/component/aws2/ddb/it/Aws2DdbStreamResource.java
 
b/integration-tests-aws2/aws2-ddb/src/main/java/org/apache/camel/quarkus/component/aws2/ddb/it/Aws2DdbStreamResource.java
new file mode 100644
index 0000000..654f121
--- /dev/null
+++ 
b/integration-tests-aws2/aws2-ddb/src/main/java/org/apache/camel/quarkus/component/aws2/ddb/it/Aws2DdbStreamResource.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.aws2.ddb.it;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import javax.enterprise.context.ApplicationScoped;
+import javax.enterprise.event.Observes;
+import javax.inject.Inject;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+
+import io.quarkus.runtime.StartupEvent;
+import org.apache.camel.ConsumerTemplate;
+import org.eclipse.microprofile.config.inject.ConfigProperty;
+import software.amazon.awssdk.services.dynamodb.model.Record;
+import software.amazon.awssdk.services.dynamodb.model.StreamRecord;
+
+@Path("/aws2-ddbstream")
+@ApplicationScoped
+public class Aws2DdbStreamResource {
+
+    @ConfigProperty(name = "aws-ddb.table-name")
+    String tableName;
+
+    @Inject
+    ConsumerTemplate consumerTemplate;
+
+    void startup(@Observes StartupEvent event) {
+        /* Hit the consumer URI at application startup so that the consumer 
starts polling eagerly */
+        consumerTemplate.receiveBody(componentUri(), 1000);
+    }
+
+    @Path("/change")
+    @GET
+    @Produces(MediaType.APPLICATION_JSON)
+    public Map<String, String> change() {
+        Map<String, String> result = new LinkedHashMap<>();
+        Record record = consumerTemplate.receiveBody(componentUri(), 10000, 
Record.class);
+        if (record == null) {
+            return null;
+        }
+        StreamRecord item = record.dynamodb();
+        result.put("key", item.keys().get("key").s());
+        if (item.hasOldImage()) {
+            result.put("old", item.oldImage().get("value").s());
+        }
+        if (item.hasNewImage()) {
+            result.put("new", item.newImage().get("value").s());
+        }
+        return result;
+    }
+
+    private String componentUri() {
+        return "aws2-ddbstream://" + tableName;
+    }
+
+}
diff --git 
a/integration-tests-aws2/aws2-ddb/src/main/resources/application.properties 
b/integration-tests-aws2/aws2-ddb/src/main/resources/application.properties
index 2cfb3d1..ea1c7a8 100644
--- a/integration-tests-aws2/aws2-ddb/src/main/resources/application.properties
+++ b/integration-tests-aws2/aws2-ddb/src/main/resources/application.properties
@@ -18,3 +18,7 @@
 camel.component.aws2-ddb.access-key=${AWS_ACCESS_KEY}
 camel.component.aws2-ddb.secret-key=${AWS_SECRET_KEY}
 camel.component.aws2-ddb.region=${AWS_REGION:us-east-1}
+
+camel.component.aws2-ddbstream.access-key=${AWS_ACCESS_KEY}
+camel.component.aws2-ddbstream.secret-key=${AWS_SECRET_KEY}
+camel.component.aws2-ddbstream.region=${AWS_REGION:us-east-1}
diff --git 
a/integration-tests-aws2/aws2-ddb/src/test/java/org/apache/camel/quarkus/component/aws2/ddb/it/Aws2DdbTest.java
 
b/integration-tests-aws2/aws2-ddb/src/test/java/org/apache/camel/quarkus/component/aws2/ddb/it/Aws2DdbTest.java
index 655b249..86108a0 100644
--- 
a/integration-tests-aws2/aws2-ddb/src/test/java/org/apache/camel/quarkus/component/aws2/ddb/it/Aws2DdbTest.java
+++ 
b/integration-tests-aws2/aws2-ddb/src/test/java/org/apache/camel/quarkus/component/aws2/ddb/it/Aws2DdbTest.java
@@ -104,6 +104,24 @@ class Aws2DdbTest {
                 },
                 Matchers.is(204));
 
+        /* The above actions should trigger the following three change events 
*/
+        RestAssured.get("/aws2-ddbstream/change")
+                .then()
+                .statusCode(200)
+                .body("key", Matchers.is(key))
+                .body("new", Matchers.is(msg));
+        RestAssured.get("/aws2-ddbstream/change")
+                .then()
+                .statusCode(200)
+                .body("key", Matchers.is(key))
+                .body("old", Matchers.is(msg))
+                .body("new", Matchers.is(newMsg));
+        RestAssured.get("/aws2-ddbstream/change")
+                .then()
+                .statusCode(200)
+                .body("key", Matchers.is(key))
+                .body("old", Matchers.is(newMsg));
+
     }
 
 }
diff --git 
a/integration-tests-aws2/aws2-ddb/src/test/java/org/apache/camel/quarkus/component/aws2/ddb/it/Aws2DdbTestEnvCustomizer.java
 
b/integration-tests-aws2/aws2-ddb/src/test/java/org/apache/camel/quarkus/component/aws2/ddb/it/Aws2DdbTestEnvCustomizer.java
index 1cc52e9..a2102c1 100644
--- 
a/integration-tests-aws2/aws2-ddb/src/test/java/org/apache/camel/quarkus/component/aws2/ddb/it/Aws2DdbTestEnvCustomizer.java
+++ 
b/integration-tests-aws2/aws2-ddb/src/test/java/org/apache/camel/quarkus/component/aws2/ddb/it/Aws2DdbTestEnvCustomizer.java
@@ -25,20 +25,21 @@ import 
org.testcontainers.containers.localstack.LocalStackContainer.Service;
 import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
 import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
 import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
-import software.amazon.awssdk.services.dynamodb.model.CreateTableResponse;
 import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest;
 import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
 import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
 import software.amazon.awssdk.services.dynamodb.model.KeyType;
 import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughput;
 import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
+import software.amazon.awssdk.services.dynamodb.model.StreamSpecification;
+import software.amazon.awssdk.services.dynamodb.model.StreamViewType;
 import software.amazon.awssdk.services.dynamodb.waiters.DynamoDbWaiter;
 
 public class Aws2DdbTestEnvCustomizer implements Aws2TestEnvCustomizer {
 
     @Override
     public Service[] localstackServices() {
-        return new Service[] { Service.DYNAMODB };
+        return new Service[] { Service.DYNAMODB, Service.DYNAMODB_STREAMS };
     }
 
     @Override
@@ -50,7 +51,7 @@ public class Aws2DdbTestEnvCustomizer implements 
Aws2TestEnvCustomizer {
         final DynamoDbClient client = envContext.client(Service.DYNAMODB, 
DynamoDbClient::builder);
         {
             final String keyColumn = "key";
-            CreateTableResponse tbl = client.createTable(
+            client.createTable(
                     CreateTableRequest.builder()
                             .attributeDefinitions(AttributeDefinition.builder()
                                     .attributeName(keyColumn)
@@ -64,6 +65,10 @@ public class Aws2DdbTestEnvCustomizer implements 
Aws2TestEnvCustomizer {
                                     .readCapacityUnits(new Long(10))
                                     .writeCapacityUnits(new Long(10))
                                     .build())
+                            .streamSpecification(StreamSpecification.builder()
+                                    .streamEnabled(true)
+                                    
.streamViewType(StreamViewType.NEW_AND_OLD_IMAGES)
+                                    .build())
                             .tableName(tableName)
                             .build());
 
diff --git 
a/integration-tests-support/aws2/src/main/java/org/apache/camel/quarkus/test/support/aws2/Aws2TestEnvContext.java
 
b/integration-tests-support/aws2/src/main/java/org/apache/camel/quarkus/test/support/aws2/Aws2TestEnvContext.java
index 4f7454d..7246bbb 100644
--- 
a/integration-tests-support/aws2/src/main/java/org/apache/camel/quarkus/test/support/aws2/Aws2TestEnvContext.java
+++ 
b/integration-tests-support/aws2/src/main/java/org/apache/camel/quarkus/test/support/aws2/Aws2TestEnvContext.java
@@ -64,6 +64,7 @@ public class Aws2TestEnvContext {
                 case SQS:
                 case SNS:
                 case DYNAMODB:
+                case DYNAMODB_STREAMS:
                     // TODO https://github.com/apache/camel-quarkus/issues/2216
                     break;
                 default:
@@ -149,6 +150,8 @@ public class Aws2TestEnvContext {
         switch (service) {
         case DYNAMODB:
             return "ddb";
+        case DYNAMODB_STREAMS:
+            return "ddbstream";
         default:
             return service.name().toLowerCase(Locale.ROOT);
         }

Reply via email to