This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push:
new db4fa44 Added new integration test for AWS 2 EC2
db4fa44 is described below
commit db4fa44ba9a622fa3b879d30fd77adafb5582bb5
Author: Otavio Rodolfo Piske <[email protected]>
AuthorDate: Fri Jan 22 14:51:26 2021 +0100
Added new integration test for AWS 2 EC2
---
tests/itests-aws-v2/pom.xml | 7 ++
.../aws/v2/common/CamelSinkAWSTestSupport.java | 12 +++
.../aws/v2/cw/sink/CamelSinkAWSCWITCase.java | 7 +-
.../v2/ec2/sink/CamelAWSEC2PropertyFactory.java | 73 +++++++++++++++++
.../sink/CamelSinkAWSEC2ITCase.java} | 91 ++++++++++------------
.../aws/v2/ec2/sink/TestEC2Configuration.java | 35 +++++++++
6 files changed, 168 insertions(+), 57 deletions(-)
diff --git a/tests/itests-aws-v2/pom.xml b/tests/itests-aws-v2/pom.xml
index caf3c28..8669c50 100644
--- a/tests/itests-aws-v2/pom.xml
+++ b/tests/itests-aws-v2/pom.xml
@@ -72,6 +72,13 @@
<groupId>org.apache.camel</groupId>
<artifactId>camel-aws2-cw</artifactId>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-aws2-ec2</artifactId>
+ </dependency>
+
+
</dependencies>
<build>
diff --git
a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/common/CamelSinkAWSTestSupport.java
b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/common/CamelSinkAWSTestSupport.java
index a66a474..6f77de6 100644
---
a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/common/CamelSinkAWSTestSupport.java
+++
b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/common/CamelSinkAWSTestSupport.java
@@ -17,6 +17,7 @@
package org.apache.camel.kafkaconnector.aws.v2.common;
+import java.time.Duration;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
@@ -72,6 +73,17 @@ public abstract class CamelSinkAWSTestSupport extends
AbstractKafkaTest {
verifyMessages(latch);
}
+ protected boolean waitForData() {
+ try {
+ Thread.sleep(Duration.ofSeconds(1).toMillis());
+ return true;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+
+ return false;
+ }
+ }
+
protected abstract void consumeMessages(CountDownLatch latch);
protected abstract void verifyMessages(CountDownLatch latch) throws
InterruptedException;
diff --git
a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/cw/sink/CamelSinkAWSCWITCase.java
b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/cw/sink/CamelSinkAWSCWITCase.java
index 0888b10..71f56f1 100644
---
a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/cw/sink/CamelSinkAWSCWITCase.java
+++
b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/cw/sink/CamelSinkAWSCWITCase.java
@@ -17,7 +17,6 @@
package org.apache.camel.kafkaconnector.aws.v2.cw.sink;
-import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
@@ -113,11 +112,7 @@ public class CamelSinkAWSCWITCase extends
CamelSinkAWSTestSupport {
}
}
- try {
- Thread.sleep(Duration.ofSeconds(1).toMillis());
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
-
+ if (!waitForData()) {
break;
}
}
diff --git
a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/ec2/sink/CamelAWSEC2PropertyFactory.java
b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/ec2/sink/CamelAWSEC2PropertyFactory.java
new file mode 100644
index 0000000..3a35bbb
--- /dev/null
+++
b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/ec2/sink/CamelAWSEC2PropertyFactory.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.aws.v2.ec2.sink;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.camel.kafkaconnector.aws.v2.common.AWSPropertiesUtils;
+import org.apache.camel.kafkaconnector.common.SinkConnectorPropertyFactory;
+import org.apache.camel.test.infra.aws.common.AWSConfigs;
+
+public class CamelAWSEC2PropertyFactory extends
SinkConnectorPropertyFactory<CamelAWSEC2PropertyFactory> {
+ public static final Map<String, String> SPRING_STYLE = new HashMap<>();
+ public static final Map<String, String> KAFKA_STYLE = new HashMap<>();
+
+ static {
+ SPRING_STYLE.put(AWSConfigs.ACCESS_KEY,
"camel.component.aws2-ec2.accessKey");
+ SPRING_STYLE.put(AWSConfigs.SECRET_KEY,
"camel.component.aws2-ec2.secretKey");
+ SPRING_STYLE.put(AWSConfigs.REGION, "camel.component.aws2-ec2.region");
+
+ KAFKA_STYLE.put(AWSConfigs.ACCESS_KEY,
"camel.component.aws2-ec2.access-key");
+ KAFKA_STYLE.put(AWSConfigs.SECRET_KEY,
"camel.component.aws2-ec2.secret-key");
+ KAFKA_STYLE.put(AWSConfigs.REGION, "camel.component.aws2-ec2.region");
+ }
+
+ public CamelAWSEC2PropertyFactory withSinkPathLabel(String value) {
+ return setProperty("camel.sink.path.label", value);
+ }
+
+ public CamelAWSEC2PropertyFactory withSinkEndpointOperation(String value) {
+ return setProperty("camel.sink.endpoint.operation", value);
+ }
+
+ public CamelAWSEC2PropertyFactory withConfiguration(String value) {
+ return setProperty("camel.component.aws2-ec2.configuration",
classRef(value));
+ }
+
+ public CamelAWSEC2PropertyFactory withAmazonConfig(Properties
amazonConfigs) {
+ return withAmazonConfig(amazonConfigs, this.SPRING_STYLE);
+ }
+
+ public CamelAWSEC2PropertyFactory withAmazonConfig(Properties
amazonConfigs, Map<String, String> style) {
+ AWSPropertiesUtils.setCommonProperties(amazonConfigs, style, this);
+
+ return this;
+ }
+
+ public static CamelAWSEC2PropertyFactory basic() {
+ return new CamelAWSEC2PropertyFactory()
+ .withTasksMax(1)
+ .withName("CamelAws2ec2SinkConnector")
+
.withConnectorClass("org.apache.camel.kafkaconnector.aws2ec2.CamelAws2ec2SinkConnector")
+
.withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter")
+
.withValueConverterClass("org.apache.kafka.connect.storage.StringConverter");
+
+ }
+}
diff --git
a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/cw/sink/CamelSinkAWSCWITCase.java
b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/ec2/sink/CamelSinkAWSEC2ITCase.java
similarity index 60%
copy from
tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/cw/sink/CamelSinkAWSCWITCase.java
copy to
tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/ec2/sink/CamelSinkAWSEC2ITCase.java
index 0888b10..22176b0 100644
---
a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/cw/sink/CamelSinkAWSCWITCase.java
+++
b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/ec2/sink/CamelSinkAWSEC2ITCase.java
@@ -15,10 +15,10 @@
* limitations under the License.
*/
-package org.apache.camel.kafkaconnector.aws.v2.cw.sink;
+package org.apache.camel.kafkaconnector.aws.v2.ec2.sink;
-import java.time.Duration;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
@@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.camel.kafkaconnector.CamelSinkTask;
import org.apache.camel.kafkaconnector.aws.v2.common.CamelSinkAWSTestSupport;
+import
org.apache.camel.kafkaconnector.aws.v2.cw.sink.TestCloudWatchConfiguration;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.utils.TestUtils;
import org.apache.camel.test.infra.aws.common.services.AWSService;
@@ -33,47 +34,37 @@ import
org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
import org.apache.camel.test.infra.aws2.services.AWSServiceFactory;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.Timeout;
-import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import software.amazon.awssdk.services.cloudwatch.CloudWatchClient;
-import software.amazon.awssdk.services.cloudwatch.model.Dimension;
-import software.amazon.awssdk.services.cloudwatch.model.ListMetricsRequest;
-import software.amazon.awssdk.services.cloudwatch.model.ListMetricsResponse;
-import software.amazon.awssdk.services.cloudwatch.model.Metric;
+import software.amazon.awssdk.services.ec2.Ec2Client;
+import software.amazon.awssdk.services.ec2.model.DescribeInstanceStatusRequest;
+import software.amazon.awssdk.services.ec2.model.InstanceStatus;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;
-@TestInstance(TestInstance.Lifecycle.PER_CLASS)
-@EnabledIfSystemProperty(named = "enable.slow.tests", matches = "true")
-public class CamelSinkAWSCWITCase extends CamelSinkAWSTestSupport {
-
+public class CamelSinkAWSEC2ITCase extends CamelSinkAWSTestSupport {
@RegisterExtension
- public static AWSService awsService =
AWSServiceFactory.createCloudWatchService();
- private static final Logger LOG =
LoggerFactory.getLogger(CamelSinkAWSCWITCase.class);
+ public static AWSService awsService = AWSServiceFactory.createEC2Service();
+ private static final Logger LOG =
LoggerFactory.getLogger(CamelSinkAWSEC2ITCase.class);
- private CloudWatchClient client;
- private String namespace;
- private String metricName = "test-metric";
+ private Ec2Client client;
+ private String logicalName;
private volatile int received;
private final int expect = 10;
@Override
protected String[] getConnectorsInTest() {
- return new String[] {"camel-aws2-cw-kafka-connector"};
+ return new String[] {"camel-aws2-ec2-kafka-connector"};
}
@BeforeEach
public void setUp() {
- client = AWSSDKClientUtils.newCloudWatchClient();
-
- namespace = "cw-" + TestUtils.randomWithRange(0, 1000);
- LOG.debug("Using namespace {} for the test", namespace);
+ client = AWSSDKClientUtils.newEC2Client();
+ logicalName = "ec2-" + TestUtils.randomWithRange(1, 100);
received = 0;
}
@@ -82,9 +73,12 @@ public class CamelSinkAWSCWITCase extends
CamelSinkAWSTestSupport {
protected Map<String, String> messageHeaders(String text, int current) {
Map<String, String> headers = new HashMap<>();
- headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX +
"CamelAwsCwMetricDimensionName",
- "test-dimension-" + current);
- headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX +
"CamelAwsCwMetricDimensionValue", String.valueOf(current));
+ headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsEC2ImageId",
+ "image-id-" + current);
+ headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX +
"CamelAwsEC2InstanceType", "T1_MICRO");
+ headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX +
"CamelAwsEC2InstanceMinCount", "1");
+ headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX +
"CamelAwsEC2InstanceMaxCount", "1");
+ headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX +
"CamelAwsEC2InstanceSecurityGroups", "default");
return headers;
}
@@ -92,32 +86,23 @@ public class CamelSinkAWSCWITCase extends
CamelSinkAWSTestSupport {
@Override
protected void consumeMessages(CountDownLatch latch) {
try {
- ListMetricsRequest request = ListMetricsRequest.builder()
- .namespace(namespace)
- .metricName(metricName)
- .build();
-
while (true) {
- ListMetricsResponse response = client.listMetrics(request);
+ DescribeInstanceStatusRequest request =
DescribeInstanceStatusRequest.builder()
+ .includeAllInstances(true)
+ .build();
+ List<InstanceStatus> statusList =
client.describeInstanceStatus(request).instanceStatuses();
- for (Metric metric : response.metrics()) {
- LOG.info("Retrieved metric {}", metric.metricName());
+ for (InstanceStatus status : statusList) {
+ LOG.info("Instance {} has status: {}",
status.instanceId(), status);
+ received++;
- for (Dimension dimension : metric.dimensions()) {
- LOG.info("Dimension {} value: {}", dimension.name(),
dimension.value());
- received++;
- if (received == expect) {
- return;
- }
+ if (received >= expect) {
+ return;
}
}
- try {
- Thread.sleep(Duration.ofSeconds(1).toMillis());
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
-
+ if (!waitForData()) {
break;
}
}
@@ -134,28 +119,32 @@ public class CamelSinkAWSCWITCase extends
CamelSinkAWSTestSupport {
fail(String.format("Failed to receive the messages within the
specified time: received %d of %d",
received, expect));
}
- }
+ }
+
@Test
- @Timeout(value = 120)
+ @Timeout(90)
public void testBasicSendReceive() {
try {
Properties amazonProperties = awsService.getConnectionProperties();
String topicName = TestUtils.getDefaultTestTopic(this.getClass());
- ConnectorPropertyFactory testProperties = CamelAWSCWPropertyFactory
+ ConnectorPropertyFactory testProperties =
CamelAWSEC2PropertyFactory
.basic()
.withTopics(topicName)
.withConfiguration(TestCloudWatchConfiguration.class.getName())
.withAmazonConfig(amazonProperties)
- .withName(metricName)
- .withSinkPathNamespace(namespace);
+ .withSinkPathLabel(logicalName)
+ .withConfiguration(TestEC2Configuration.class.getName())
+ .withSinkEndpointOperation("createAndRunInstances");
runTest(testProperties, topicName, expect);
} catch (Exception e) {
- LOG.error("Amazon CloudWatch test failed: {}", e.getMessage(), e);
+ LOG.error("Amazon EC2 test failed: {}", e.getMessage(), e);
fail(e.getMessage());
}
}
+
+
}
diff --git
a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/ec2/sink/TestEC2Configuration.java
b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/ec2/sink/TestEC2Configuration.java
new file mode 100644
index 0000000..783316a
--- /dev/null
+++
b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/ec2/sink/TestEC2Configuration.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.aws.v2.ec2.sink;
+
+import org.apache.camel.component.aws2.ec2.AWS2EC2Configuration;
+import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
+import software.amazon.awssdk.services.ec2.Ec2Client;
+
+public class TestEC2Configuration extends AWS2EC2Configuration {
+ private Ec2Client client;
+
+ @Override
+ public Ec2Client getAmazonEc2Client() {
+ if (client == null) {
+ client = AWSSDKClientUtils.newEC2Client();
+ }
+
+ return client;
+ }
+}