This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new db4fa44  Added new integration test for AWS 2 EC2
db4fa44 is described below

commit db4fa44ba9a622fa3b879d30fd77adafb5582bb5
Author: Otavio Rodolfo Piske <[email protected]>
AuthorDate: Fri Jan 22 14:51:26 2021 +0100

    Added new integration test for AWS 2 EC2
---
 tests/itests-aws-v2/pom.xml                        |  7 ++
 .../aws/v2/common/CamelSinkAWSTestSupport.java     | 12 +++
 .../aws/v2/cw/sink/CamelSinkAWSCWITCase.java       |  7 +-
 .../v2/ec2/sink/CamelAWSEC2PropertyFactory.java    | 73 +++++++++++++++++
 .../sink/CamelSinkAWSEC2ITCase.java}               | 91 ++++++++++------------
 .../aws/v2/ec2/sink/TestEC2Configuration.java      | 35 +++++++++
 6 files changed, 168 insertions(+), 57 deletions(-)

diff --git a/tests/itests-aws-v2/pom.xml b/tests/itests-aws-v2/pom.xml
index caf3c28..8669c50 100644
--- a/tests/itests-aws-v2/pom.xml
+++ b/tests/itests-aws-v2/pom.xml
@@ -72,6 +72,13 @@
             <groupId>org.apache.camel</groupId>
             <artifactId>camel-aws2-cw</artifactId>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-aws2-ec2</artifactId>
+        </dependency>
+
+
     </dependencies>
 
     <build>
diff --git 
a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/common/CamelSinkAWSTestSupport.java
 
b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/common/CamelSinkAWSTestSupport.java
index a66a474..6f77de6 100644
--- 
a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/common/CamelSinkAWSTestSupport.java
+++ 
b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/common/CamelSinkAWSTestSupport.java
@@ -17,6 +17,7 @@
 
 package org.apache.camel.kafkaconnector.aws.v2.common;
 
+import java.time.Duration;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
@@ -72,6 +73,17 @@ public abstract class CamelSinkAWSTestSupport extends 
AbstractKafkaTest {
         verifyMessages(latch);
     }
 
+    protected boolean waitForData() {
+        try {
+            Thread.sleep(Duration.ofSeconds(1).toMillis());
+            return true;
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+
+            return false;
+        }
+    }
+
     protected abstract void consumeMessages(CountDownLatch latch);
 
     protected abstract void verifyMessages(CountDownLatch latch) throws 
InterruptedException;
diff --git 
a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/cw/sink/CamelSinkAWSCWITCase.java
 
b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/cw/sink/CamelSinkAWSCWITCase.java
index 0888b10..71f56f1 100644
--- 
a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/cw/sink/CamelSinkAWSCWITCase.java
+++ 
b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/cw/sink/CamelSinkAWSCWITCase.java
@@ -17,7 +17,6 @@
 
 package org.apache.camel.kafkaconnector.aws.v2.cw.sink;
 
-import java.time.Duration;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
@@ -113,11 +112,7 @@ public class CamelSinkAWSCWITCase extends 
CamelSinkAWSTestSupport {
                     }
                 }
 
-                try {
-                    Thread.sleep(Duration.ofSeconds(1).toMillis());
-                } catch (InterruptedException e) {
-                    Thread.currentThread().interrupt();
-
+                if (!waitForData()) {
                     break;
                 }
             }
diff --git 
a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/ec2/sink/CamelAWSEC2PropertyFactory.java
 
b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/ec2/sink/CamelAWSEC2PropertyFactory.java
new file mode 100644
index 0000000..3a35bbb
--- /dev/null
+++ 
b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/ec2/sink/CamelAWSEC2PropertyFactory.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.aws.v2.ec2.sink;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.camel.kafkaconnector.aws.v2.common.AWSPropertiesUtils;
+import org.apache.camel.kafkaconnector.common.SinkConnectorPropertyFactory;
+import org.apache.camel.test.infra.aws.common.AWSConfigs;
+
+public class CamelAWSEC2PropertyFactory extends 
SinkConnectorPropertyFactory<CamelAWSEC2PropertyFactory> {
+    public static final Map<String, String> SPRING_STYLE = new HashMap<>();
+    public static final Map<String, String> KAFKA_STYLE = new HashMap<>();
+
+    static {
+        SPRING_STYLE.put(AWSConfigs.ACCESS_KEY, 
"camel.component.aws2-ec2.accessKey");
+        SPRING_STYLE.put(AWSConfigs.SECRET_KEY, 
"camel.component.aws2-ec2.secretKey");
+        SPRING_STYLE.put(AWSConfigs.REGION, "camel.component.aws2-ec2.region");
+
+        KAFKA_STYLE.put(AWSConfigs.ACCESS_KEY, 
"camel.component.aws2-ec2.access-key");
+        KAFKA_STYLE.put(AWSConfigs.SECRET_KEY, 
"camel.component.aws2-ec2.secret-key");
+        KAFKA_STYLE.put(AWSConfigs.REGION, "camel.component.aws2-ec2.region");
+    }
+
+    public CamelAWSEC2PropertyFactory withSinkPathLabel(String value) {
+        return setProperty("camel.sink.path.label", value);
+    }
+
+    public CamelAWSEC2PropertyFactory withSinkEndpointOperation(String value) {
+        return setProperty("camel.sink.endpoint.operation", value);
+    }
+
+    public CamelAWSEC2PropertyFactory withConfiguration(String value) {
+        return setProperty("camel.component.aws2-ec2.configuration", 
classRef(value));
+    }
+
+    public CamelAWSEC2PropertyFactory withAmazonConfig(Properties 
amazonConfigs) {
+        return withAmazonConfig(amazonConfigs, this.SPRING_STYLE);
+    }
+
+    public CamelAWSEC2PropertyFactory withAmazonConfig(Properties 
amazonConfigs, Map<String, String> style) {
+        AWSPropertiesUtils.setCommonProperties(amazonConfigs, style, this);
+
+        return this;
+    }
+
+    public static CamelAWSEC2PropertyFactory basic() {
+        return new CamelAWSEC2PropertyFactory()
+                    .withTasksMax(1)
+                    .withName("CamelAws2ec2SinkConnector")
+                    
.withConnectorClass("org.apache.camel.kafkaconnector.aws2ec2.CamelAws2ec2SinkConnector")
+                    
.withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter")
+                    
.withValueConverterClass("org.apache.kafka.connect.storage.StringConverter");
+
+    }
+}
diff --git 
a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/cw/sink/CamelSinkAWSCWITCase.java
 
b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/ec2/sink/CamelSinkAWSEC2ITCase.java
similarity index 60%
copy from 
tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/cw/sink/CamelSinkAWSCWITCase.java
copy to 
tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/ec2/sink/CamelSinkAWSEC2ITCase.java
index 0888b10..22176b0 100644
--- 
a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/cw/sink/CamelSinkAWSCWITCase.java
+++ 
b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/ec2/sink/CamelSinkAWSEC2ITCase.java
@@ -15,10 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.camel.kafkaconnector.aws.v2.cw.sink;
+package org.apache.camel.kafkaconnector.aws.v2.ec2.sink;
 
-import java.time.Duration;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
@@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.kafkaconnector.CamelSinkTask;
 import org.apache.camel.kafkaconnector.aws.v2.common.CamelSinkAWSTestSupport;
+import 
org.apache.camel.kafkaconnector.aws.v2.cw.sink.TestCloudWatchConfiguration;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
 import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.apache.camel.test.infra.aws.common.services.AWSService;
@@ -33,47 +34,37 @@ import 
org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
 import org.apache.camel.test.infra.aws2.services.AWSServiceFactory;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.TestInstance;
 import org.junit.jupiter.api.Timeout;
-import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
 import org.junit.jupiter.api.extension.RegisterExtension;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import software.amazon.awssdk.services.cloudwatch.CloudWatchClient;
-import software.amazon.awssdk.services.cloudwatch.model.Dimension;
-import software.amazon.awssdk.services.cloudwatch.model.ListMetricsRequest;
-import software.amazon.awssdk.services.cloudwatch.model.ListMetricsResponse;
-import software.amazon.awssdk.services.cloudwatch.model.Metric;
+import software.amazon.awssdk.services.ec2.Ec2Client;
+import software.amazon.awssdk.services.ec2.model.DescribeInstanceStatusRequest;
+import software.amazon.awssdk.services.ec2.model.InstanceStatus;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.fail;
 
-@TestInstance(TestInstance.Lifecycle.PER_CLASS)
-@EnabledIfSystemProperty(named = "enable.slow.tests", matches = "true")
-public class CamelSinkAWSCWITCase extends CamelSinkAWSTestSupport {
-
+public class CamelSinkAWSEC2ITCase extends CamelSinkAWSTestSupport {
     @RegisterExtension
-    public static AWSService awsService = 
AWSServiceFactory.createCloudWatchService();
-    private static final Logger LOG = 
LoggerFactory.getLogger(CamelSinkAWSCWITCase.class);
+    public static AWSService awsService = AWSServiceFactory.createEC2Service();
+    private static final Logger LOG = 
LoggerFactory.getLogger(CamelSinkAWSEC2ITCase.class);
 
-    private CloudWatchClient client;
-    private String namespace;
-    private String metricName = "test-metric";
+    private Ec2Client client;
+    private String logicalName;
 
     private volatile int received;
     private final int expect = 10;
 
     @Override
     protected String[] getConnectorsInTest() {
-        return new String[] {"camel-aws2-cw-kafka-connector"};
+        return new String[] {"camel-aws2-ec2-kafka-connector"};
     }
 
     @BeforeEach
     public void setUp() {
-        client = AWSSDKClientUtils.newCloudWatchClient();
-
-        namespace = "cw-" + TestUtils.randomWithRange(0, 1000);
-        LOG.debug("Using namespace {} for the test", namespace);
+        client = AWSSDKClientUtils.newEC2Client();
+        logicalName = "ec2-" + TestUtils.randomWithRange(1, 100);
 
         received = 0;
     }
@@ -82,9 +73,12 @@ public class CamelSinkAWSCWITCase extends 
CamelSinkAWSTestSupport {
     protected Map<String, String> messageHeaders(String text, int current) {
         Map<String, String> headers = new HashMap<>();
 
-        headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + 
"CamelAwsCwMetricDimensionName",
-                "test-dimension-" + current);
-        headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + 
"CamelAwsCwMetricDimensionValue", String.valueOf(current));
+        headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsEC2ImageId",
+                "image-id-" + current);
+        headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + 
"CamelAwsEC2InstanceType", "T1_MICRO");
+        headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + 
"CamelAwsEC2InstanceMinCount", "1");
+        headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + 
"CamelAwsEC2InstanceMaxCount", "1");
+        headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + 
"CamelAwsEC2InstanceSecurityGroups", "default");
 
         return headers;
     }
@@ -92,32 +86,23 @@ public class CamelSinkAWSCWITCase extends 
CamelSinkAWSTestSupport {
     @Override
     protected void consumeMessages(CountDownLatch latch) {
         try {
-            ListMetricsRequest request = ListMetricsRequest.builder()
-                    .namespace(namespace)
-                    .metricName(metricName)
-                    .build();
-
             while (true) {
-                ListMetricsResponse response = client.listMetrics(request);
+                DescribeInstanceStatusRequest request = 
DescribeInstanceStatusRequest.builder()
+                        .includeAllInstances(true)
+                        .build();
+                List<InstanceStatus> statusList = 
client.describeInstanceStatus(request).instanceStatuses();
 
-                for (Metric metric : response.metrics()) {
-                    LOG.info("Retrieved metric {}", metric.metricName());
+                for (InstanceStatus status : statusList) {
+                    LOG.info("Instance {} has status: {}", 
status.instanceId(), status);
+                    received++;
 
-                    for (Dimension dimension : metric.dimensions()) {
-                        LOG.info("Dimension {} value: {}", dimension.name(), 
dimension.value());
-                        received++;
 
-                        if (received == expect) {
-                            return;
-                        }
+                    if (received >= expect) {
+                        return;
                     }
                 }
 
-                try {
-                    Thread.sleep(Duration.ofSeconds(1).toMillis());
-                } catch (InterruptedException e) {
-                    Thread.currentThread().interrupt();
-
+                if (!waitForData()) {
                     break;
                 }
             }
@@ -134,28 +119,32 @@ public class CamelSinkAWSCWITCase extends 
CamelSinkAWSTestSupport {
             fail(String.format("Failed to receive the messages within the 
specified time: received %d of %d",
                     received, expect));
         }
-    }
 
 
+    }
+
     @Test
-    @Timeout(value = 120)
+    @Timeout(90)
     public void testBasicSendReceive() {
         try {
             Properties amazonProperties = awsService.getConnectionProperties();
             String topicName = TestUtils.getDefaultTestTopic(this.getClass());
 
-            ConnectorPropertyFactory testProperties = CamelAWSCWPropertyFactory
+            ConnectorPropertyFactory testProperties = 
CamelAWSEC2PropertyFactory
                     .basic()
                     .withTopics(topicName)
                     
.withConfiguration(TestCloudWatchConfiguration.class.getName())
                     .withAmazonConfig(amazonProperties)
-                    .withName(metricName)
-                    .withSinkPathNamespace(namespace);
+                    .withSinkPathLabel(logicalName)
+                    .withConfiguration(TestEC2Configuration.class.getName())
+                    .withSinkEndpointOperation("createAndRunInstances");
 
             runTest(testProperties, topicName, expect);
         } catch (Exception e) {
-            LOG.error("Amazon CloudWatch test failed: {}", e.getMessage(), e);
+            LOG.error("Amazon EC2 test failed: {}", e.getMessage(), e);
             fail(e.getMessage());
         }
     }
+
+
 }
diff --git 
a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/ec2/sink/TestEC2Configuration.java
 
b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/ec2/sink/TestEC2Configuration.java
new file mode 100644
index 0000000..783316a
--- /dev/null
+++ 
b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/ec2/sink/TestEC2Configuration.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.aws.v2.ec2.sink;
+
+import org.apache.camel.component.aws2.ec2.AWS2EC2Configuration;
+import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
+import software.amazon.awssdk.services.ec2.Ec2Client;
+
+public class TestEC2Configuration extends AWS2EC2Configuration {
+    private Ec2Client client;
+
+    @Override
+    public Ec2Client getAmazonEc2Client() {
+        if (client == null) {
+            client = AWSSDKClientUtils.newEC2Client();
+        }
+
+        return client;
+    }
+}

Reply via email to