This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 676b86a  Added AWS v2 KMS sink test case
676b86a is described below

commit 676b86a3af107673bbe2dd873d25bbae00bae4b5
Author: Otavio Rodolfo Piske <opi...@redhat.com>
AuthorDate: Thu Jan 28 10:49:56 2021 +0100

    Added AWS v2 KMS sink test case
---
 tests/itests-aws-v2/pom.xml                        |   5 +-
 .../v2/kms/sink/CamelAWSKMSPropertyFactory.java    |  73 ++++++++++
 .../aws/v2/kms/sink/CamelSinkAWSKMSITCase.java     | 151 +++++++++++++++++++++
 .../aws/v2/kms/sink/TestKMS2Configuration.java     |  35 +++++
 4 files changed, 263 insertions(+), 1 deletion(-)

diff --git a/tests/itests-aws-v2/pom.xml b/tests/itests-aws-v2/pom.xml
index 48f40f9..62048dc 100644
--- a/tests/itests-aws-v2/pom.xml
+++ b/tests/itests-aws-v2/pom.xml
@@ -83,7 +83,10 @@
             <artifactId>camel-aws2-iam</artifactId>
         </dependency>
 
-
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-aws2-kms</artifactId>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kms/sink/CamelAWSKMSPropertyFactory.java
 
b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kms/sink/CamelAWSKMSPropertyFactory.java
new file mode 100644
index 0000000..9a0a4d5
--- /dev/null
+++ 
b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kms/sink/CamelAWSKMSPropertyFactory.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.aws.v2.kms.sink;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.camel.kafkaconnector.aws.v2.common.AWSPropertiesUtils;
+import org.apache.camel.kafkaconnector.common.SinkConnectorPropertyFactory;
+import org.apache.camel.test.infra.aws.common.AWSConfigs;
+
+public class CamelAWSKMSPropertyFactory extends 
SinkConnectorPropertyFactory<CamelAWSKMSPropertyFactory> {
+    public static final Map<String, String> SPRING_STYLE = new HashMap<>();
+    public static final Map<String, String> KAFKA_STYLE = new HashMap<>();
+
+    static {
+        SPRING_STYLE.put(AWSConfigs.ACCESS_KEY, 
"camel.component.aws2-kms.accessKey");
+        SPRING_STYLE.put(AWSConfigs.SECRET_KEY, 
"camel.component.aws2-kms.secretKey");
+        SPRING_STYLE.put(AWSConfigs.REGION, "camel.component.aws2-kms.region");
+
+        KAFKA_STYLE.put(AWSConfigs.ACCESS_KEY, 
"camel.component.aws2-kms.access-key");
+        KAFKA_STYLE.put(AWSConfigs.SECRET_KEY, 
"camel.component.aws2-kms.secret-key");
+        KAFKA_STYLE.put(AWSConfigs.REGION, "camel.component.aws2-kms.region");
+    }
+
+    public CamelAWSKMSPropertyFactory withSinkPathLabel(String value) {
+        return setProperty("camel.sink.path.label", value);
+    }
+
+    public CamelAWSKMSPropertyFactory withSinkEndpointOperation(String value) {
+        return setProperty("camel.sink.endpoint.operation", value);
+    }
+
+    public CamelAWSKMSPropertyFactory withConfiguration(String value) {
+        return setProperty("camel.component.aws2-kms.configuration", 
classRef(value));
+    }
+
+    public CamelAWSKMSPropertyFactory withAmazonConfig(Properties 
amazonConfigs) {
+        return withAmazonConfig(amazonConfigs, this.SPRING_STYLE);
+    }
+
+    public CamelAWSKMSPropertyFactory withAmazonConfig(Properties 
amazonConfigs, Map<String, String> style) {
+        AWSPropertiesUtils.setCommonProperties(amazonConfigs, style, this);
+
+        return this;
+    }
+
+    public static CamelAWSKMSPropertyFactory basic() {
+        return new CamelAWSKMSPropertyFactory()
+                    .withTasksMax(1)
+                    .withName("CamelAws2kmsSinkConnector")
+                    
.withConnectorClass("org.apache.camel.kafkaconnector.aws2kms.CamelAws2kmsSinkConnector")
+                    
.withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter")
+                    
.withValueConverterClass("org.apache.kafka.connect.storage.StringConverter");
+
+    }
+}
diff --git 
a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kms/sink/CamelSinkAWSKMSITCase.java
 
b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kms/sink/CamelSinkAWSKMSITCase.java
new file mode 100644
index 0000000..fffc0e9
--- /dev/null
+++ 
b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kms/sink/CamelSinkAWSKMSITCase.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.aws.v2.kms.sink;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.kafkaconnector.CamelSinkTask;
+import org.apache.camel.kafkaconnector.aws.v2.common.CamelSinkAWSTestSupport;
+import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.apache.camel.test.infra.aws.common.services.AWSService;
+import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
+import org.apache.camel.test.infra.aws2.services.AWSServiceFactory;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.kms.KmsClient;
+import software.amazon.awssdk.services.kms.model.DescribeKeyRequest;
+import software.amazon.awssdk.services.kms.model.DescribeKeyResponse;
+import software.amazon.awssdk.services.kms.model.KeyListEntry;
+import software.amazon.awssdk.services.kms.model.ListKeysResponse;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+@EnabledIfSystemProperty(named = "enable.slow.tests", matches = "true")
+public class CamelSinkAWSKMSITCase extends CamelSinkAWSTestSupport {
+
+    @RegisterExtension
+    public static AWSService awsService = AWSServiceFactory.createKMSService();
+    private static final Logger LOG = 
LoggerFactory.getLogger(CamelSinkAWSKMSITCase.class);
+
+    private String logicalName;
+    private KmsClient client;
+
+    private volatile int received;
+    private final int expect = 10;
+
+    @Override
+    protected Map<String, String> messageHeaders(String text, int current) {
+        Map<String, String> headers = new HashMap<>();
+
+        headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsKMSKeyId",
+                String.valueOf(current));
+
+        headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + 
"CamelAwsKMSDescription",
+                "test key " + current);
+
+
+        return headers;
+    }
+
+    @Override
+    protected void consumeMessages(CountDownLatch latch) {
+        try {
+            while (true) {
+                ListKeysResponse response = client.listKeys();
+                List<KeyListEntry> keys = response.keys();
+
+                received = keys.size();
+                for (KeyListEntry entry : keys) {
+                    DescribeKeyRequest describeKeyRequest = 
DescribeKeyRequest.builder().keyId(entry.keyId()).build();
+
+                    DescribeKeyResponse describeKeyResponse = 
client.describeKey(describeKeyRequest);
+
+                    LOG.info("Received key: {} / {}: {}", entry.keyId(), 
entry.keyArn(),
+                            describeKeyResponse.keyMetadata().description());
+                }
+
+                if (received >= expect) {
+                    return;
+                }
+
+                if (!waitForData()) {
+                    return;
+                }
+            }
+        } finally {
+            latch.countDown();
+        }
+    }
+
+    @Override
+    protected void verifyMessages(CountDownLatch latch) throws 
InterruptedException {
+        if (latch.await(110, TimeUnit.SECONDS)) {
+            // There is 1 default key from localstack, so the check here is 
different.
+            assertTrue(received >= expect, "Should have processed at least : " 
+ expect
+                    + " keys, but processed only " + received);
+        } else {
+            fail(String.format("Failed to receive the messages within the 
specified time: received %d of %d",
+                    received, expect));
+        }
+    }
+
+    @Override
+    protected String[] getConnectorsInTest() {
+        return new String[] {"camel-aws2-kms-kafka-connector"};
+    }
+
+    @BeforeEach
+    public void setUp() {
+        logicalName = "kms-" + TestUtils.randomWithRange(0, 100);
+
+        client = AWSSDKClientUtils.newKMSClient();
+        received = 0;
+    }
+
+
+    @Test
+    @Timeout(120)
+    public void testBasicSendReceive() throws Exception {
+        Properties amazonProperties = awsService.getConnectionProperties();
+        String topicName = TestUtils.getDefaultTestTopic(this.getClass());
+
+        ConnectorPropertyFactory connectorPropertyFactory = 
CamelAWSKMSPropertyFactory
+                .basic()
+                .withTopics(topicName)
+                .withAmazonConfig(amazonProperties)
+                .withConfiguration(TestKMS2Configuration.class.getName())
+                .withSinkEndpointOperation("createKey")
+                .withSinkPathLabel(logicalName);
+
+        runTest(connectorPropertyFactory, topicName, expect);
+    }
+}
diff --git 
a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kms/sink/TestKMS2Configuration.java
 
b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kms/sink/TestKMS2Configuration.java
new file mode 100644
index 0000000..3e9c1a6
--- /dev/null
+++ 
b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kms/sink/TestKMS2Configuration.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.aws.v2.kms.sink;
+
+import org.apache.camel.component.aws2.kms.KMS2Configuration;
+import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
+import software.amazon.awssdk.services.kms.KmsClient;
+
+public class TestKMS2Configuration extends KMS2Configuration {
+    private KmsClient client;
+
+    @Override
+    public KmsClient getKmsClient() {
+        if (client == null) {
+            client = AWSSDKClientUtils.newKMSClient();
+        }
+
+        return client;
+    }
+}

Reply via email to