This is an automated email from the ASF dual-hosted git repository. ppalaga pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git
The following commit(s) were added to refs/heads/master by this push: new 83e5c76 Test AWS 2 Firehose 83e5c76 is described below commit 83e5c768b6196de7a7fb04990c3a40e2a22d24f7 Author: Peter Palaga <ppal...@redhat.com> AuthorDate: Tue Feb 16 11:02:14 2021 +0100 Test AWS 2 Firehose --- integration-tests-aws2/aws2-kinesis/pom.xml | 10 + .../kinesis/it/Aws2KinesisFirehoseResource.java | 65 ++++++ .../src/main/resources/application.properties | 4 + .../component/aws2/kinesis/it/Aws2KinesisTest.java | 92 ++++++++- .../kinesis/it/Aws2KinesisTestEnvCustomizer.java | 222 ++++++++++++++++++++- .../test/support/aws2/Aws2TestEnvContext.java | 57 ++++-- .../test/support/aws2/Aws2TestEnvCustomizer.java | 7 + .../test/support/aws2/Aws2TestResource.java | 11 +- integration-tests/aws2-grouped/pom.xml | 5 + pom.xml | 2 +- .../integration-test-pom.xml | 2 +- 11 files changed, 446 insertions(+), 31 deletions(-) diff --git a/integration-tests-aws2/aws2-kinesis/pom.xml b/integration-tests-aws2/aws2-kinesis/pom.xml index e850bed..d05c8be 100644 --- a/integration-tests-aws2/aws2-kinesis/pom.xml +++ b/integration-tests-aws2/aws2-kinesis/pom.xml @@ -83,6 +83,16 @@ <artifactId>awaitility</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>software.amazon.awssdk</groupId> + <artifactId>s3</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>software.amazon.awssdk</groupId> + <artifactId>iam</artifactId> + <scope>test</scope> + </dependency> <!-- The following dependencies guarantee that this module is built after them. You can update them by running `mvn process-resources -Pformat -N` from the source tree root directory --> <dependency> diff --git a/integration-tests-aws2/aws2-kinesis/src/main/java/org/apache/camel/quarkus/component/aws2/kinesis/it/Aws2KinesisFirehoseResource.java b/integration-tests-aws2/aws2-kinesis/src/main/java/org/apache/camel/quarkus/component/aws2/kinesis/it/Aws2KinesisFirehoseResource.java new file mode 100644 index 0000000..6376226 --- /dev/null +++ b/integration-tests-aws2/aws2-kinesis/src/main/java/org/apache/camel/quarkus/component/aws2/kinesis/it/Aws2KinesisFirehoseResource.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.quarkus.component.aws2.kinesis.it; + +import java.net.URI; + +import javax.enterprise.context.ApplicationScoped; +import javax.inject.Inject; +import javax.ws.rs.Consumes; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; + +import org.apache.camel.ProducerTemplate; +import org.apache.camel.component.aws2.kinesis.Kinesis2Constants; +import org.eclipse.microprofile.config.inject.ConfigProperty; + +@Path("/aws2-kinesis-firehose") +@ApplicationScoped +public class Aws2KinesisFirehoseResource { + + @ConfigProperty(name = "aws-kinesis-firehose.delivery-stream-name") + String deliveryStreamName; + + @Inject + ProducerTemplate producerTemplate; + + @Path("/send") + @POST + @Consumes(MediaType.TEXT_PLAIN) + @Produces(MediaType.TEXT_PLAIN) + public Response send(String message) throws Exception { + final String response = producerTemplate.requestBodyAndHeader( + componentUri(), + message, + Kinesis2Constants.PARTITION_KEY, + "foo-partition-key", + String.class); + return Response + .created(new URI("https://camel.apache.org/")) + .entity(response) + .build(); + } + + private String componentUri() { + return "aws2-kinesis-firehose://" + deliveryStreamName; + } + +} diff --git a/integration-tests-aws2/aws2-kinesis/src/main/resources/application.properties b/integration-tests-aws2/aws2-kinesis/src/main/resources/application.properties index 926eb0c..e7a4c2d 100644 --- a/integration-tests-aws2/aws2-kinesis/src/main/resources/application.properties +++ b/integration-tests-aws2/aws2-kinesis/src/main/resources/application.properties @@ -18,3 +18,7 @@ camel.component.aws2-kinesis.access-key=${AWS_ACCESS_KEY} camel.component.aws2-kinesis.secret-key=${AWS_SECRET_KEY} camel.component.aws2-kinesis.region=${AWS_REGION:us-east-1} + +camel.component.aws2-kinesis-firehose.access-key=${AWS_ACCESS_KEY} +camel.component.aws2-kinesis-firehose.secret-key=${AWS_SECRET_KEY} +camel.component.aws2-kinesis-firehose.region=${AWS_REGION:us-east-1} diff --git a/integration-tests-aws2/aws2-kinesis/src/test/java/org/apache/camel/quarkus/component/aws2/kinesis/it/Aws2KinesisTest.java b/integration-tests-aws2/aws2-kinesis/src/test/java/org/apache/camel/quarkus/component/aws2/kinesis/it/Aws2KinesisTest.java index 7efffec..d093e10 100644 --- a/integration-tests-aws2/aws2-kinesis/src/test/java/org/apache/camel/quarkus/component/aws2/kinesis/it/Aws2KinesisTest.java +++ b/integration-tests-aws2/aws2-kinesis/src/test/java/org/apache/camel/quarkus/component/aws2/kinesis/it/Aws2KinesisTest.java @@ -16,20 +16,43 @@ */ package org.apache.camel.quarkus.component.aws2.kinesis.it; +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.concurrent.TimeUnit; + import io.quarkus.test.common.QuarkusTestResource; import io.quarkus.test.junit.QuarkusTest; import io.restassured.RestAssured; import io.restassured.http.ContentType; import org.apache.camel.quarkus.test.support.aws2.Aws2TestResource; +import org.apache.commons.lang3.RandomStringUtils; +import org.awaitility.Awaitility; +import org.eclipse.microprofile.config.Config; +import org.eclipse.microprofile.config.ConfigProvider; import org.hamcrest.Matchers; +import org.jboss.logging.Logger; import org.junit.jupiter.api.Test; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.core.ResponseInputStream; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.S3ClientBuilder; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; +import software.amazon.awssdk.services.s3.model.ListObjectsRequest; +import software.amazon.awssdk.services.s3.model.ListObjectsResponse; +import software.amazon.awssdk.services.s3.model.S3Object; @QuarkusTest @QuarkusTestResource(Aws2TestResource.class) class Aws2KinesisTest { + private static final Logger LOG = Logger.getLogger(Aws2KinesisTest.class); + @Test - public void test() { + public void kinesis() { final String msg = "kinesis-" + java.util.UUID.randomUUID().toString().replace("-", ""); RestAssured.given() // .contentType(ContentType.TEXT) @@ -44,4 +67,71 @@ class Aws2KinesisTest { .body(Matchers.is(msg)); } + @Test + public void firehose() { + final String msg = RandomStringUtils.randomAlphanumeric(32 * 1024); + final String msgPrefix = msg.substring(0, 32); + final long maxDataBytes = Aws2KinesisTestEnvCustomizer.BUFFERING_SIZE_MB * 1024 * 1024; + long bytesSent = 0; + LOG.info("Sending " + Aws2KinesisTestEnvCustomizer.BUFFERING_SIZE_MB + " MB of data to firehose using chunk " + + msgPrefix + "..."); + final long deadline = System.currentTimeMillis() + (Aws2KinesisTestEnvCustomizer.BUFFERING_TIME_SEC * 1000); + while (bytesSent < maxDataBytes && System.currentTimeMillis() < deadline) { + /* Send at least 1MB of data but do not spend more than a minute by doing it. + * This is to overpass minimum buffering limits we have set via BufferingHints in the EnvCustomizer */ + RestAssured.given() // + .contentType(ContentType.TEXT) + .body(msg) + .post("/aws2-kinesis-firehose/send") // + .then() + .statusCode(201); + bytesSent += msg.length(); + LOG.info("Sent " + bytesSent + "/" + maxDataBytes + " bytes of data"); + } + LOG.info("Sent " + Aws2KinesisTestEnvCustomizer.BUFFERING_SIZE_MB + " MB of data to firehose"); + + final Config config = ConfigProvider.getConfig(); + + S3ClientBuilder builder = S3Client.builder() + .credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create( + config.getValue("camel.component.aws2-kinesis.access-key", String.class), + config.getValue("camel.component.aws2-kinesis.secret-key", String.class)))) + .region(Region.of(config.getValue("camel.component.aws2-kinesis.region", String.class))); + + config.getOptionalValue("camel.component.aws2-kinesis.uri-endpoint-override", + String.class).ifPresent(endpointOverride -> builder.endpointOverride(URI.create(endpointOverride))); + try (S3Client client = builder.build()) { + + final String bucketName = config.getValue("aws-kinesis.s3-bucket-name", String.class); + + Awaitility.await().pollInterval(1, TimeUnit.SECONDS).atMost(120, TimeUnit.SECONDS).until( + () -> { + final ListObjectsResponse objects = client + .listObjects(ListObjectsRequest.builder().bucket(bucketName).build()); + final List<S3Object> objs = objects.contents(); + LOG.info("There are " + objs.size() + " objects in bucket " + bucketName); + for (S3Object obj : objs) { + LOG.info("Checking object " + obj.key() + " of size " + obj.size()); + try (ResponseInputStream<GetObjectResponse> o = client + .getObject(GetObjectRequest.builder().bucket(bucketName).key(obj.key()).build())) { + final StringBuilder sb = new StringBuilder(msg.length()); + final byte[] buf = new byte[1024]; + int len; + while ((len = o.read(buf)) >= 0 && sb.length() < msgPrefix.length()) { + sb.append(new String(buf, 0, len, StandardCharsets.UTF_8)); + } + final String foundContent = sb.toString(); + if (foundContent.startsWith(msgPrefix)) { + /* Yes, this is what we have sent */ + LOG.info("Found the expected content in object " + obj.key()); + return true; + } + } + } + return false; + }); + } + + } + } diff --git a/integration-tests-aws2/aws2-kinesis/src/test/java/org/apache/camel/quarkus/component/aws2/kinesis/it/Aws2KinesisTestEnvCustomizer.java b/integration-tests-aws2/aws2-kinesis/src/test/java/org/apache/camel/quarkus/component/aws2/kinesis/it/Aws2KinesisTestEnvCustomizer.java index 189465d..4486af8 100644 --- a/integration-tests-aws2/aws2-kinesis/src/test/java/org/apache/camel/quarkus/component/aws2/kinesis/it/Aws2KinesisTestEnvCustomizer.java +++ b/integration-tests-aws2/aws2-kinesis/src/test/java/org/apache/camel/quarkus/component/aws2/kinesis/it/Aws2KinesisTestEnvCustomizer.java @@ -16,33 +16,73 @@ */ package org.apache.camel.quarkus.component.aws2.kinesis.it; +import java.util.List; import java.util.Locale; +import java.util.concurrent.TimeUnit; import org.apache.camel.quarkus.test.support.aws2.Aws2TestEnvContext; import org.apache.camel.quarkus.test.support.aws2.Aws2TestEnvCustomizer; import org.apache.commons.lang3.RandomStringUtils; +import org.awaitility.Awaitility; +import org.jboss.logging.Logger; import org.testcontainers.containers.localstack.LocalStackContainer.Service; +import software.amazon.awssdk.services.firehose.FirehoseClient; +import software.amazon.awssdk.services.firehose.model.BufferingHints; +import software.amazon.awssdk.services.firehose.model.CreateDeliveryStreamRequest; +import software.amazon.awssdk.services.firehose.model.DeleteDeliveryStreamRequest; +import software.amazon.awssdk.services.firehose.model.DeliveryStreamStatus; +import software.amazon.awssdk.services.firehose.model.DeliveryStreamType; +import software.amazon.awssdk.services.firehose.model.DescribeDeliveryStreamRequest; +import software.amazon.awssdk.services.firehose.model.InvalidArgumentException; +import software.amazon.awssdk.services.firehose.model.S3DestinationConfiguration; +import software.amazon.awssdk.services.iam.IamClient; +import software.amazon.awssdk.services.iam.model.AttachRolePolicyRequest; +import software.amazon.awssdk.services.iam.model.CreatePolicyRequest; +import software.amazon.awssdk.services.iam.model.CreateRoleRequest; +import software.amazon.awssdk.services.iam.model.DeletePolicyRequest; +import software.amazon.awssdk.services.iam.model.DeleteRoleRequest; +import software.amazon.awssdk.services.iam.model.DetachRolePolicyRequest; +import software.amazon.awssdk.services.iam.model.GetPolicyRequest; +import software.amazon.awssdk.services.iam.model.GetRoleRequest; +import software.amazon.awssdk.services.iam.waiters.IamWaiter; import software.amazon.awssdk.services.kinesis.KinesisClient; import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest; import software.amazon.awssdk.services.kinesis.model.DeleteStreamRequest; import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest; import software.amazon.awssdk.services.kinesis.waiters.KinesisWaiter; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.CreateBucketRequest; +import software.amazon.awssdk.services.s3.model.DeleteBucketRequest; +import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; +import software.amazon.awssdk.services.s3.model.HeadBucketRequest; +import software.amazon.awssdk.services.s3.model.ListObjectsRequest; +import software.amazon.awssdk.services.s3.model.ListObjectsResponse; +import software.amazon.awssdk.services.s3.model.S3Object; +import software.amazon.awssdk.services.s3.waiters.S3Waiter; public class Aws2KinesisTestEnvCustomizer implements Aws2TestEnvCustomizer { + public static final int BUFFERING_SIZE_MB = 1; + public static final int BUFFERING_TIME_SEC = 60; + private static final Logger LOG = Logger.getLogger(Aws2KinesisTestEnvCustomizer.class); @Override public Service[] localstackServices() { - return new Service[] { Service.KINESIS }; + return new Service[] { Service.KINESIS, Service.FIREHOSE, Service.S3, Service.IAM }; + } + + @Override + public Service[] exportCredentialsForLocalstackServices() { + return new Service[] { Service.KINESIS, Service.FIREHOSE }; } @Override public void customize(Aws2TestEnvContext envContext) { final String streamName = "camel-quarkus-" + RandomStringUtils.randomAlphanumeric(16).toLowerCase(Locale.ROOT); - envContext.property("aws-kinesis.stream-name", streamName); - - final KinesisClient client = envContext.client(Service.KINESIS, KinesisClient::builder); + final String streamArn; { + envContext.property("aws-kinesis.stream-name", streamName); + final KinesisClient client = envContext.client(Service.KINESIS, KinesisClient::builder); client.createStream( CreateStreamRequest.builder() .shardCount(1) @@ -50,13 +90,183 @@ public class Aws2KinesisTestEnvCustomizer implements Aws2TestEnvCustomizer { .build()); try (KinesisWaiter waiter = client.waiter()) { - waiter.waitUntilStreamExists(DescribeStreamRequest.builder() + streamArn = waiter.waitUntilStreamExists(DescribeStreamRequest.builder() .streamName(streamName) - .build()); + .build()) + .matched().response().get().streamDescription().streamARN(); } envContext.closeable(() -> client.deleteStream(DeleteStreamRequest.builder().streamName(streamName).build())); } + { + final S3Client s3Client = envContext.client(Service.S3, S3Client::builder); + + final String bucketName = "camel-quarkus-firehose-" + + RandomStringUtils.randomAlphanumeric(32).toLowerCase(Locale.ROOT); + final String bucketArn = "arn:aws:s3:::" + bucketName; + envContext.property("aws-kinesis.s3-bucket-name", bucketName); + s3Client.createBucket(CreateBucketRequest.builder().bucket(bucketName).build()); + envContext.closeable(() -> s3Client.deleteBucket(DeleteBucketRequest.builder().bucket(bucketName).build())); + envContext.closeable(() -> { + final ListObjectsResponse objects = s3Client.listObjects( + ListObjectsRequest.builder() + .bucket(bucketName) + .build()); + final List<S3Object> objs = objects.contents(); + LOG.info("Deleting " + objs.size() + " objects in bucket " + bucketName); + for (S3Object obj : objs) { + LOG.info("Deleting object " + obj.key()); + s3Client.deleteObject(DeleteObjectRequest.builder().bucket(bucketName).key(obj.key()).build()); + } + }); + + try (S3Waiter w = s3Client.waiter()) { + w.waitUntilBucketExists(HeadBucketRequest.builder().bucket(bucketName).build()); + } + + final String deliveryStreamName = "camel-quarkus-firehose-delstr-" + + RandomStringUtils.randomAlphanumeric(16).toLowerCase(Locale.ROOT); + envContext.property("aws-kinesis-firehose.delivery-stream-name", deliveryStreamName); + + final String roleName = "s3-" + deliveryStreamName; + + final IamClient iamClient = envContext.client(Service.IAM, IamClient::builder); + final String roleArn = iamClient.createRole( + CreateRoleRequest.builder() + .roleName(roleName) + .path("/service-role/") + .assumeRolePolicyDocument("{\n" + + " \"Version\": \"2012-10-17\",\n" + + " \"Statement\": [\n" + + " {\n" + + " \"Sid\": \"sid" + RandomStringUtils.randomAlphanumeric(16) + "\",\n" + + " \"Effect\": \"Allow\",\n" + + " \"Principal\": {\n" + + " \"Service\": \"firehose.amazonaws.com\"\n" + + " },\n" + + " \"Action\": \"sts:AssumeRole\"\n" + + " }\n" + + " ]\n" + + "}") + .build()) + .role().arn(); + envContext.closeable(() -> iamClient.deleteRole(DeleteRoleRequest.builder().roleName(roleName).build())); + + try (IamWaiter w = iamClient.waiter()) { + w.waitUntilRoleExists(GetRoleRequest.builder().roleName(roleName).build()); + } + + final String policyName = "firehose-s3-policy-" + deliveryStreamName; + + final String policy = "{\n" + + " \"Version\": \"2012-10-17\",\n" + + " \"Statement\":\n" + + " [\n" + + " {\n" + + " \"Sid\": \"sid" + RandomStringUtils.randomAlphanumeric(16) + "\",\n" + + " \"Effect\": \"Allow\",\n" + + " \"Action\": [\n" + + " \"s3:AbortMultipartUpload\",\n" + + " \"s3:GetBucketLocation\",\n" + + " \"s3:GetObject\",\n" + + " \"s3:ListBucket\",\n" + + " \"s3:ListBucketMultipartUploads\",\n" + + " \"s3:PutObject\"\n" + + " ], \n" + + " \"Resource\": [\n" + + " \"arn:aws:s3:::" + bucketName + "\",\n" + + " \"arn:aws:s3:::" + bucketName + "/*\"\n" + + " ]\n" + + " },\n" + + " {\n" + + " \"Sid\": \"sid" + RandomStringUtils.randomAlphanumeric(16) + "\",\n" + + " \"Effect\": \"Allow\",\n" + + " \"Action\": [\n" + + " \"kinesis:DescribeStream\",\n" + + " \"kinesis:GetShardIterator\",\n" + + " \"kinesis:GetRecords\",\n" + + " \"kinesis:ListShards\"\n" + + " ],\n" + + " \"Resource\": \"" + streamArn + "\"\n" + + " }\n" + + " ]\n" + + "}"; + final String policyArn = iamClient.createPolicy( + CreatePolicyRequest.builder() + .policyName(policyName) + .policyDocument(policy) + .build()) + .policy().arn(); + envContext.closeable(() -> iamClient.deletePolicy(DeletePolicyRequest.builder().policyArn(policyArn).build())); + + try (IamWaiter w = iamClient.waiter()) { + w.waitUntilPolicyExists(GetPolicyRequest.builder().policyArn(policyArn).build()); + } + + iamClient.attachRolePolicy( + AttachRolePolicyRequest.builder() + .policyArn(policyArn) + .roleName(roleName) + .build()); + envContext.closeable(() -> iamClient.detachRolePolicy( + DetachRolePolicyRequest.builder() + .roleName(roleName) + .policyArn(policyArn) + .build())); + + final FirehoseClient fhClient = envContext.client(Service.FIREHOSE, FirehoseClient::builder); + + /* + * Some of the dependency resources above needs some time to get visible for the firehose service + * So we need to retry creation of the delivery stream until it succeeds + */ + Awaitility.await().pollInterval(1, TimeUnit.SECONDS).atMost(120, TimeUnit.SECONDS).until( + () -> { + try { + fhClient.createDeliveryStream( + CreateDeliveryStreamRequest.builder() + .deliveryStreamName(deliveryStreamName) + .s3DestinationConfiguration( + S3DestinationConfiguration.builder() + .bucketARN(bucketArn) + .roleARN(roleArn) + .bufferingHints( + BufferingHints.builder() + .intervalInSeconds(BUFFERING_TIME_SEC) + .sizeInMBs(BUFFERING_SIZE_MB) + .build()) + .build()) + .deliveryStreamType(DeliveryStreamType.DIRECT_PUT) + .build()); + LOG.info("Firehose delivery stream " + deliveryStreamName + " finally created"); + return true; + } catch (InvalidArgumentException e) { + LOG.info("Retrying the creation of delivery stream " + deliveryStreamName + " because " + + e.getMessage()); + return false; + } + }); + + /* + * There is no waiter for FirehoseClient so we are polling the state of the stream until the state is ACTIVE + * Feel free to improve if you see a more elegant way to do this + */ + Awaitility.await().pollInterval(1, TimeUnit.SECONDS).atMost(120, TimeUnit.SECONDS).until( + () -> { + DeliveryStreamStatus status = fhClient.describeDeliveryStream( + DescribeDeliveryStreamRequest.builder() + .deliveryStreamName(deliveryStreamName) + .build()) + .deliveryStreamDescription().deliveryStreamStatus(); + LOG.info("Delivery stream " + deliveryStreamName + " status: " + status); + return status == DeliveryStreamStatus.ACTIVE; + }); + + envContext.closeable(() -> fhClient.deleteDeliveryStream( + DeleteDeliveryStreamRequest.builder().deliveryStreamName(deliveryStreamName).build())); + + } + } } diff --git a/integration-tests-support/aws2/src/main/java/org/apache/camel/quarkus/test/support/aws2/Aws2TestEnvContext.java b/integration-tests-support/aws2/src/main/java/org/apache/camel/quarkus/test/support/aws2/Aws2TestEnvContext.java index 7246bbb..0b66ab2 100644 --- a/integration-tests-support/aws2/src/main/java/org/apache/camel/quarkus/test/support/aws2/Aws2TestEnvContext.java +++ b/integration-tests-support/aws2/src/main/java/org/apache/camel/quarkus/test/support/aws2/Aws2TestEnvContext.java @@ -16,6 +16,7 @@ */ package org.apache.camel.quarkus.test.support.aws2; +import java.net.URI; import java.util.ArrayList; import java.util.Collections; import java.util.LinkedHashMap; @@ -47,31 +48,33 @@ public class Aws2TestEnvContext { private final Optional<LocalStackContainer> localstack; public Aws2TestEnvContext(String accessKey, String secretKey, String region, Optional<LocalStackContainer> localstack, - Service... services) { + Service[] exportCredentialsServices) { this.accessKey = accessKey; this.secretKey = secretKey; this.region = region; this.localstack = localstack; localstack.ifPresent(ls -> { - for (Service service : services) { + for (Service service : exportCredentialsServices) { String s = camelServiceAcronym(service); - properties.put("camel.component.aws2-" + s + ".access-key", accessKey); - properties.put("camel.component.aws2-" + s + ".secret-key", secretKey); - properties.put("camel.component.aws2-" + s + ".region", region); + if (s != null) { + properties.put("camel.component.aws2-" + s + ".access-key", accessKey); + properties.put("camel.component.aws2-" + s + ".secret-key", secretKey); + properties.put("camel.component.aws2-" + s + ".region", region); - switch (service) { - case SQS: - case SNS: - case DYNAMODB: - case DYNAMODB_STREAMS: - // TODO https://github.com/apache/camel-quarkus/issues/2216 - break; - default: - properties.put("camel.component.aws2-" + s + ".override-endpoint", "true"); - properties.put("camel.component.aws2-" + s + ".uri-endpoint-override", - ls.getEndpointOverride(service).toString()); - break; + switch (service) { + case SQS: + case SNS: + case DYNAMODB: + case DYNAMODB_STREAMS: + // TODO https://github.com/apache/camel-quarkus/issues/2216 + break; + default: + properties.put("camel.component.aws2-" + s + ".override-endpoint", "true"); + properties.put("camel.component.aws2-" + s + ".uri-endpoint-override", + ls.getEndpointOverride(service).toString()); + break; + } } } }); @@ -136,11 +139,19 @@ public class Aws2TestEnvContext { Supplier<B> builderSupplier) { B builder = builderSupplier.get() .credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create( - accessKey, secretKey))) - .region(Region.of(region)); + accessKey, secretKey))); + builder.region(Region.of(region)); + if (localstack.isPresent()) { - builder.endpointOverride(localstack.get().getEndpointOverride(service)); + builder + .endpointOverride(localstack.get().getEndpointOverride(service)) + .region(Region.of(region)); + } else if (service == Service.IAM) { + /* Avoid UnknownHostException: iam.eu-central-1.amazonaws.com */ + builder.endpointOverride(URI.create("https://iam.amazonaws.com")); + builder.region(Region.of("us-east-1")); } + final C client = builder.build(); closeables.add(client); return client; @@ -152,8 +163,14 @@ public class Aws2TestEnvContext { return "ddb"; case DYNAMODB_STREAMS: return "ddbstream"; + case FIREHOSE: + return "kinesis-firehose"; default: return service.name().toLowerCase(Locale.ROOT); } } + + public String getRegion() { + return region; + } } diff --git a/integration-tests-support/aws2/src/main/java/org/apache/camel/quarkus/test/support/aws2/Aws2TestEnvCustomizer.java b/integration-tests-support/aws2/src/main/java/org/apache/camel/quarkus/test/support/aws2/Aws2TestEnvCustomizer.java index c479f00..bf073f8 100644 --- a/integration-tests-support/aws2/src/main/java/org/apache/camel/quarkus/test/support/aws2/Aws2TestEnvCustomizer.java +++ b/integration-tests-support/aws2/src/main/java/org/apache/camel/quarkus/test/support/aws2/Aws2TestEnvCustomizer.java @@ -30,6 +30,13 @@ public interface Aws2TestEnvCustomizer { Service[] localstackServices(); /** + * @return an array of Localstack services for which {@link Aws2TestEnvContext} should export credentials properties + */ + default Service[] exportCredentialsForLocalstackServices() { + return localstackServices(); + } + + /** * Customize the given {@link Aws2TestEnvContext} * * @param envContext the {@link Aws2TestEnvContext} to customize diff --git a/integration-tests-support/aws2/src/main/java/org/apache/camel/quarkus/test/support/aws2/Aws2TestResource.java b/integration-tests-support/aws2/src/main/java/org/apache/camel/quarkus/test/support/aws2/Aws2TestResource.java index 4e4e48a..f1be06f 100644 --- a/integration-tests-support/aws2/src/main/java/org/apache/camel/quarkus/test/support/aws2/Aws2TestResource.java +++ b/integration-tests-support/aws2/src/main/java/org/apache/camel/quarkus/test/support/aws2/Aws2TestResource.java @@ -58,6 +58,13 @@ public final class Aws2TestResource implements ContainerResourceLifecycleManager final Service[] services = customizers.stream() .map(Aws2TestEnvCustomizer::localstackServices) .flatMap((Service[] ss) -> Stream.of(ss)) + .distinct() + .toArray(Service[]::new); + + final Service[] exportCredentialsServices = customizers.stream() + .map(Aws2TestEnvCustomizer::exportCredentialsForLocalstackServices) + .flatMap((Service[] ss) -> Stream.of(ss)) + .distinct() .toArray(Service[]::new); LocalStackContainer localstack = new LocalStackContainer(DockerImageName.parse("localstack/localstack:0.12.6")) @@ -65,7 +72,7 @@ public final class Aws2TestResource implements ContainerResourceLifecycleManager localstack.start(); envContext = new Aws2TestEnvContext(localstack.getAccessKey(), localstack.getSecretKey(), localstack.getRegion(), - Optional.of(localstack), services); + Optional.of(localstack), exportCredentialsServices); } else { if (!startMockBackend && !realCredentialsProvided) { @@ -73,7 +80,7 @@ public final class Aws2TestResource implements ContainerResourceLifecycleManager "Set AWS_ACCESS_KEY, AWS_SECRET_KEY and AWS_REGION env vars if you set CAMEL_QUARKUS_START_MOCK_BACKEND=false"); } MockBackendUtils.logRealBackendUsed(); - envContext = new Aws2TestEnvContext(realKey, realSecret, realRegion, Optional.empty()); + envContext = new Aws2TestEnvContext(realKey, realSecret, realRegion, Optional.empty(), new Service[0]); } customizers.forEach(customizer -> customizer.customize(envContext)); diff --git a/integration-tests/aws2-grouped/pom.xml b/integration-tests/aws2-grouped/pom.xml index c88fe59..7708feb 100644 --- a/integration-tests/aws2-grouped/pom.xml +++ b/integration-tests/aws2-grouped/pom.xml @@ -95,6 +95,11 @@ <artifactId>awaitility</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>software.amazon.awssdk</groupId> + <artifactId>iam</artifactId> + <scope>test</scope> + </dependency> <!-- The following dependencies guarantee that this module is built after them. You can update them by running `mvn process-resources -Pformat -N` from the source tree root directory --> <dependency> diff --git a/pom.xml b/pom.xml index 0c9a9fd..c59d912 100644 --- a/pom.xml +++ b/pom.xml @@ -145,7 +145,7 @@ <zt-exec.version>1.11</zt-exec.version> <!-- Maven plugin versions (keep sorted alphabetically) --> - <cq-plugin.version>0.30.0</cq-plugin.version> + <cq-plugin.version>0.31.0</cq-plugin.version> <build-helper-maven-plugin.version>3.1.0</build-helper-maven-plugin.version> <exec-maven-plugin.version>3.0.0</exec-maven-plugin.version> <formatter-maven-plugin.version>2.11.0</formatter-maven-plugin.version> diff --git a/tooling/create-extension-templates/integration-test-pom.xml b/tooling/create-extension-templates/integration-test-pom.xml index 51d3e9a..c5eefb4 100644 --- a/tooling/create-extension-templates/integration-test-pom.xml +++ b/tooling/create-extension-templates/integration-test-pom.xml @@ -23,7 +23,7 @@ <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.apache.camel.quarkus</groupId> - <artifactId>camel-quarkus-[#if nativeSupported]integration-tests[#else]build-parent-it[/#if]</artifactId> + <artifactId>camel-quarkus-[#if nativeSupported][=itestParentArtifactId][#else]build-parent-it[/#if]</artifactId> <version>[=version]</version> <relativePath>[#if nativeSupported]../pom.xml[#else]../../../poms/build-parent-it/pom.xml[/#if]</relativePath> </parent>