This is an automated email from the ASF dual-hosted git repository. jamesnetherton pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git
commit 77072a23a8a82231095493d1995d24c39ecc17f4 Author: Peter Palaga <[email protected]> AuthorDate: Fri Feb 12 11:30:50 2021 +0100 Test AWS 2 DynamoDB Streams --- .../aws2/ddb/it/Aws2DdbStreamResource.java | 75 ++++++++++++++++++++++ .../src/main/resources/application.properties | 4 ++ .../quarkus/component/aws2/ddb/it/Aws2DdbTest.java | 18 ++++++ .../aws2/ddb/it/Aws2DdbTestEnvCustomizer.java | 11 +++- .../test/support/aws2/Aws2TestEnvContext.java | 3 + 5 files changed, 108 insertions(+), 3 deletions(-) diff --git a/integration-tests-aws2/aws2-ddb/src/main/java/org/apache/camel/quarkus/component/aws2/ddb/it/Aws2DdbStreamResource.java b/integration-tests-aws2/aws2-ddb/src/main/java/org/apache/camel/quarkus/component/aws2/ddb/it/Aws2DdbStreamResource.java new file mode 100644 index 0000000..654f121 --- /dev/null +++ b/integration-tests-aws2/aws2-ddb/src/main/java/org/apache/camel/quarkus/component/aws2/ddb/it/Aws2DdbStreamResource.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.quarkus.component.aws2.ddb.it; + +import java.util.LinkedHashMap; +import java.util.Map; + +import javax.enterprise.context.ApplicationScoped; +import javax.enterprise.event.Observes; +import javax.inject.Inject; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; + +import io.quarkus.runtime.StartupEvent; +import org.apache.camel.ConsumerTemplate; +import org.eclipse.microprofile.config.inject.ConfigProperty; +import software.amazon.awssdk.services.dynamodb.model.Record; +import software.amazon.awssdk.services.dynamodb.model.StreamRecord; + +@Path("/aws2-ddbstream") +@ApplicationScoped +public class Aws2DdbStreamResource { + + @ConfigProperty(name = "aws-ddb.table-name") + String tableName; + + @Inject + ConsumerTemplate consumerTemplate; + + void startup(@Observes StartupEvent event) { + /* Hit the consumer URI at application startup so that the consumer starts polling eagerly */ + consumerTemplate.receiveBody(componentUri(), 1000); + } + + @Path("/change") + @GET + @Produces(MediaType.APPLICATION_JSON) + public Map<String, String> change() { + Map<String, String> result = new LinkedHashMap<>(); + Record record = consumerTemplate.receiveBody(componentUri(), 10000, Record.class); + if (record == null) { + return null; + } + StreamRecord item = record.dynamodb(); + result.put("key", item.keys().get("key").s()); + if (item.hasOldImage()) { + result.put("old", item.oldImage().get("value").s()); + } + if (item.hasNewImage()) { + result.put("new", item.newImage().get("value").s()); + } + return result; + } + + private String componentUri() { + return "aws2-ddbstream://" + tableName; + } + +} diff --git a/integration-tests-aws2/aws2-ddb/src/main/resources/application.properties b/integration-tests-aws2/aws2-ddb/src/main/resources/application.properties index 2cfb3d1..ea1c7a8 100644 --- a/integration-tests-aws2/aws2-ddb/src/main/resources/application.properties +++ b/integration-tests-aws2/aws2-ddb/src/main/resources/application.properties @@ -18,3 +18,7 @@ camel.component.aws2-ddb.access-key=${AWS_ACCESS_KEY} camel.component.aws2-ddb.secret-key=${AWS_SECRET_KEY} camel.component.aws2-ddb.region=${AWS_REGION:us-east-1} + +camel.component.aws2-ddbstream.access-key=${AWS_ACCESS_KEY} +camel.component.aws2-ddbstream.secret-key=${AWS_SECRET_KEY} +camel.component.aws2-ddbstream.region=${AWS_REGION:us-east-1} diff --git a/integration-tests-aws2/aws2-ddb/src/test/java/org/apache/camel/quarkus/component/aws2/ddb/it/Aws2DdbTest.java b/integration-tests-aws2/aws2-ddb/src/test/java/org/apache/camel/quarkus/component/aws2/ddb/it/Aws2DdbTest.java index 655b249..86108a0 100644 --- a/integration-tests-aws2/aws2-ddb/src/test/java/org/apache/camel/quarkus/component/aws2/ddb/it/Aws2DdbTest.java +++ b/integration-tests-aws2/aws2-ddb/src/test/java/org/apache/camel/quarkus/component/aws2/ddb/it/Aws2DdbTest.java @@ -104,6 +104,24 @@ class Aws2DdbTest { }, Matchers.is(204)); + /* The above actions should trigger the following three change events */ + RestAssured.get("/aws2-ddbstream/change") + .then() + .statusCode(200) + .body("key", Matchers.is(key)) + .body("new", Matchers.is(msg)); + RestAssured.get("/aws2-ddbstream/change") + .then() + .statusCode(200) + .body("key", Matchers.is(key)) + .body("old", Matchers.is(msg)) + .body("new", Matchers.is(newMsg)); + RestAssured.get("/aws2-ddbstream/change") + .then() + .statusCode(200) + .body("key", Matchers.is(key)) + .body("old", Matchers.is(newMsg)); + } } diff --git a/integration-tests-aws2/aws2-ddb/src/test/java/org/apache/camel/quarkus/component/aws2/ddb/it/Aws2DdbTestEnvCustomizer.java b/integration-tests-aws2/aws2-ddb/src/test/java/org/apache/camel/quarkus/component/aws2/ddb/it/Aws2DdbTestEnvCustomizer.java index 1cc52e9..a2102c1 100644 --- a/integration-tests-aws2/aws2-ddb/src/test/java/org/apache/camel/quarkus/component/aws2/ddb/it/Aws2DdbTestEnvCustomizer.java +++ b/integration-tests-aws2/aws2-ddb/src/test/java/org/apache/camel/quarkus/component/aws2/ddb/it/Aws2DdbTestEnvCustomizer.java @@ -25,20 +25,21 @@ import org.testcontainers.containers.localstack.LocalStackContainer.Service; import software.amazon.awssdk.services.dynamodb.DynamoDbClient; import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition; import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest; -import software.amazon.awssdk.services.dynamodb.model.CreateTableResponse; import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest; import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest; import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement; import software.amazon.awssdk.services.dynamodb.model.KeyType; import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughput; import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType; +import software.amazon.awssdk.services.dynamodb.model.StreamSpecification; +import software.amazon.awssdk.services.dynamodb.model.StreamViewType; import software.amazon.awssdk.services.dynamodb.waiters.DynamoDbWaiter; public class Aws2DdbTestEnvCustomizer implements Aws2TestEnvCustomizer { @Override public Service[] localstackServices() { - return new Service[] { Service.DYNAMODB }; + return new Service[] { Service.DYNAMODB, Service.DYNAMODB_STREAMS }; } @Override @@ -50,7 +51,7 @@ public class Aws2DdbTestEnvCustomizer implements Aws2TestEnvCustomizer { final DynamoDbClient client = envContext.client(Service.DYNAMODB, DynamoDbClient::builder); { final String keyColumn = "key"; - CreateTableResponse tbl = client.createTable( + client.createTable( CreateTableRequest.builder() .attributeDefinitions(AttributeDefinition.builder() .attributeName(keyColumn) @@ -64,6 +65,10 @@ public class Aws2DdbTestEnvCustomizer implements Aws2TestEnvCustomizer { .readCapacityUnits(new Long(10)) .writeCapacityUnits(new Long(10)) .build()) + .streamSpecification(StreamSpecification.builder() + .streamEnabled(true) + .streamViewType(StreamViewType.NEW_AND_OLD_IMAGES) + .build()) .tableName(tableName) .build()); diff --git a/integration-tests-support/aws2/src/main/java/org/apache/camel/quarkus/test/support/aws2/Aws2TestEnvContext.java b/integration-tests-support/aws2/src/main/java/org/apache/camel/quarkus/test/support/aws2/Aws2TestEnvContext.java index 4f7454d..7246bbb 100644 --- a/integration-tests-support/aws2/src/main/java/org/apache/camel/quarkus/test/support/aws2/Aws2TestEnvContext.java +++ b/integration-tests-support/aws2/src/main/java/org/apache/camel/quarkus/test/support/aws2/Aws2TestEnvContext.java @@ -64,6 +64,7 @@ public class Aws2TestEnvContext { case SQS: case SNS: case DYNAMODB: + case DYNAMODB_STREAMS: // TODO https://github.com/apache/camel-quarkus/issues/2216 break; default: @@ -149,6 +150,8 @@ public class Aws2TestEnvContext { switch (service) { case DYNAMODB: return "ddb"; + case DYNAMODB_STREAMS: + return "ddbstream"; default: return service.name().toLowerCase(Locale.ROOT); }
