This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push: new 676b86a Added AWS v2 KMS sink test case 676b86a is described below commit 676b86a3af107673bbe2dd873d25bbae00bae4b5 Author: Otavio Rodolfo Piske <opi...@redhat.com> AuthorDate: Thu Jan 28 10:49:56 2021 +0100 Added AWS v2 KMS sink test case --- tests/itests-aws-v2/pom.xml | 5 +- .../v2/kms/sink/CamelAWSKMSPropertyFactory.java | 73 ++++++++++ .../aws/v2/kms/sink/CamelSinkAWSKMSITCase.java | 151 +++++++++++++++++++++ .../aws/v2/kms/sink/TestKMS2Configuration.java | 35 +++++ 4 files changed, 263 insertions(+), 1 deletion(-) diff --git a/tests/itests-aws-v2/pom.xml b/tests/itests-aws-v2/pom.xml index 48f40f9..62048dc 100644 --- a/tests/itests-aws-v2/pom.xml +++ b/tests/itests-aws-v2/pom.xml @@ -83,7 +83,10 @@ <artifactId>camel-aws2-iam</artifactId> </dependency> - + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-aws2-kms</artifactId> + </dependency> </dependencies> <build> diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kms/sink/CamelAWSKMSPropertyFactory.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kms/sink/CamelAWSKMSPropertyFactory.java new file mode 100644 index 0000000..9a0a4d5 --- /dev/null +++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kms/sink/CamelAWSKMSPropertyFactory.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.aws.v2.kms.sink; + +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +import org.apache.camel.kafkaconnector.aws.v2.common.AWSPropertiesUtils; +import org.apache.camel.kafkaconnector.common.SinkConnectorPropertyFactory; +import org.apache.camel.test.infra.aws.common.AWSConfigs; + +public class CamelAWSKMSPropertyFactory extends SinkConnectorPropertyFactory<CamelAWSKMSPropertyFactory> { + public static final Map<String, String> SPRING_STYLE = new HashMap<>(); + public static final Map<String, String> KAFKA_STYLE = new HashMap<>(); + + static { + SPRING_STYLE.put(AWSConfigs.ACCESS_KEY, "camel.component.aws2-kms.accessKey"); + SPRING_STYLE.put(AWSConfigs.SECRET_KEY, "camel.component.aws2-kms.secretKey"); + SPRING_STYLE.put(AWSConfigs.REGION, "camel.component.aws2-kms.region"); + + KAFKA_STYLE.put(AWSConfigs.ACCESS_KEY, "camel.component.aws2-kms.access-key"); + KAFKA_STYLE.put(AWSConfigs.SECRET_KEY, "camel.component.aws2-kms.secret-key"); + KAFKA_STYLE.put(AWSConfigs.REGION, "camel.component.aws2-kms.region"); + } + + public CamelAWSKMSPropertyFactory withSinkPathLabel(String value) { + return setProperty("camel.sink.path.label", value); + } + + public CamelAWSKMSPropertyFactory withSinkEndpointOperation(String value) { + return setProperty("camel.sink.endpoint.operation", value); + } + + public CamelAWSKMSPropertyFactory withConfiguration(String value) { + return setProperty("camel.component.aws2-kms.configuration", classRef(value)); + } + + public CamelAWSKMSPropertyFactory withAmazonConfig(Properties amazonConfigs) { + return withAmazonConfig(amazonConfigs, this.SPRING_STYLE); + } + + public CamelAWSKMSPropertyFactory withAmazonConfig(Properties amazonConfigs, Map<String, String> style) { + AWSPropertiesUtils.setCommonProperties(amazonConfigs, style, this); + + return this; + } + + public static CamelAWSKMSPropertyFactory basic() { + return new CamelAWSKMSPropertyFactory() + .withTasksMax(1) + .withName("CamelAws2kmsSinkConnector") + .withConnectorClass("org.apache.camel.kafkaconnector.aws2kms.CamelAws2kmsSinkConnector") + .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter") + .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter"); + + } +} diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kms/sink/CamelSinkAWSKMSITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kms/sink/CamelSinkAWSKMSITCase.java new file mode 100644 index 0000000..fffc0e9 --- /dev/null +++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kms/sink/CamelSinkAWSKMSITCase.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.aws.v2.kms.sink; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.kafkaconnector.CamelSinkTask; +import org.apache.camel.kafkaconnector.aws.v2.common.CamelSinkAWSTestSupport; +import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; +import org.apache.camel.kafkaconnector.common.utils.TestUtils; +import org.apache.camel.test.infra.aws.common.services.AWSService; +import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils; +import org.apache.camel.test.infra.aws2.services.AWSServiceFactory; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.condition.EnabledIfSystemProperty; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.kms.KmsClient; +import software.amazon.awssdk.services.kms.model.DescribeKeyRequest; +import software.amazon.awssdk.services.kms.model.DescribeKeyResponse; +import software.amazon.awssdk.services.kms.model.KeyListEntry; +import software.amazon.awssdk.services.kms.model.ListKeysResponse; + +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +@EnabledIfSystemProperty(named = "enable.slow.tests", matches = "true") +public class CamelSinkAWSKMSITCase extends CamelSinkAWSTestSupport { + + @RegisterExtension + public static AWSService awsService = AWSServiceFactory.createKMSService(); + private static final Logger LOG = LoggerFactory.getLogger(CamelSinkAWSKMSITCase.class); + + private String logicalName; + private KmsClient client; + + private volatile int received; + private final int expect = 10; + + @Override + protected Map<String, String> messageHeaders(String text, int current) { + Map<String, String> headers = new HashMap<>(); + + headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsKMSKeyId", + String.valueOf(current)); + + headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsKMSDescription", + "test key " + current); + + + return headers; + } + + @Override + protected void consumeMessages(CountDownLatch latch) { + try { + while (true) { + ListKeysResponse response = client.listKeys(); + List<KeyListEntry> keys = response.keys(); + + received = keys.size(); + for (KeyListEntry entry : keys) { + DescribeKeyRequest describeKeyRequest = DescribeKeyRequest.builder().keyId(entry.keyId()).build(); + + DescribeKeyResponse describeKeyResponse = client.describeKey(describeKeyRequest); + + LOG.info("Received key: {} / {}: {}", entry.keyId(), entry.keyArn(), + describeKeyResponse.keyMetadata().description()); + } + + if (received >= expect) { + return; + } + + if (!waitForData()) { + return; + } + } + } finally { + latch.countDown(); + } + } + + @Override + protected void verifyMessages(CountDownLatch latch) throws InterruptedException { + if (latch.await(110, TimeUnit.SECONDS)) { + // There is 1 default key from localstack, so the check here is different. + assertTrue(received >= expect, "Should have processed at least : " + expect + + " keys, but processed only " + received); + } else { + fail(String.format("Failed to receive the messages within the specified time: received %d of %d", + received, expect)); + } + } + + @Override + protected String[] getConnectorsInTest() { + return new String[] {"camel-aws2-kms-kafka-connector"}; + } + + @BeforeEach + public void setUp() { + logicalName = "kms-" + TestUtils.randomWithRange(0, 100); + + client = AWSSDKClientUtils.newKMSClient(); + received = 0; + } + + + @Test + @Timeout(120) + public void testBasicSendReceive() throws Exception { + Properties amazonProperties = awsService.getConnectionProperties(); + String topicName = TestUtils.getDefaultTestTopic(this.getClass()); + + ConnectorPropertyFactory connectorPropertyFactory = CamelAWSKMSPropertyFactory + .basic() + .withTopics(topicName) + .withAmazonConfig(amazonProperties) + .withConfiguration(TestKMS2Configuration.class.getName()) + .withSinkEndpointOperation("createKey") + .withSinkPathLabel(logicalName); + + runTest(connectorPropertyFactory, topicName, expect); + } +} diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kms/sink/TestKMS2Configuration.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kms/sink/TestKMS2Configuration.java new file mode 100644 index 0000000..3e9c1a6 --- /dev/null +++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kms/sink/TestKMS2Configuration.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.aws.v2.kms.sink; + +import org.apache.camel.component.aws2.kms.KMS2Configuration; +import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils; +import software.amazon.awssdk.services.kms.KmsClient; + +public class TestKMS2Configuration extends KMS2Configuration { + private KmsClient client; + + @Override + public KmsClient getKmsClient() { + if (client == null) { + client = AWSSDKClientUtils.newKMSClient(); + } + + return client; + } +}