This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch camel-master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/camel-master by this push: new 9625df1 Make producing test messages for sink tests more flexible 9625df1 is described below commit 9625df18bf5f32eab8b60badf95fa599f8c5e9c6 Author: Otavio Rodolfo Piske <opi...@redhat.com> AuthorDate: Wed Feb 3 18:26:19 2021 +0100 Make producing test messages for sink tests more flexible --- .../aws/v1/sns/sink/CamelSinkAWSSNSITCase.java | 6 -- .../aws/v1/sqs/sink/CamelSinkAWSSQSITCase.java | 6 -- .../aws/v2/cw/sink/CamelSinkAWSCWITCase.java | 60 ++++++++++--------- .../aws/v2/ec2/sink/CamelSinkAWSEC2ITCase.java | 69 +++++++++++----------- .../aws/v2/iam/sink/CamelSinkAWSIAMITCase.java | 52 ++++++++-------- .../v2/kinesis/sink/CamelSinkAWSKinesisITCase.java | 21 ++++--- .../aws/v2/kms/sink/CamelSinkAWSKMSITCase.java | 25 +++++--- .../aws/v2/s3/sink/CamelSinkAWSS3ITCase.java | 26 +++++--- .../aws/v2/sqs/sink/CamelSinkAWSSQSITCase.java | 5 -- .../blob/sink/CamelSinkAzureStorageBlobITCase.java | 47 ++++++++------- .../sink/CamelSinkAzureStorageQueueITCase.java | 6 -- .../cassandra/sink/CamelSinkCassandraITCase.java | 6 -- .../common/test/AbstractTestMessageProducer.java | 59 ++++++++++++++++++ .../common/test/CamelSinkTestSupport.java | 64 ++++++++++---------- ...cer.java => FunctionalTestMessageProducer.java} | 4 +- ...ageProducer.java => StringMessageProducer.java} | 29 ++++++++- .../common/test/TestMessageProducer.java | 13 +++- .../couchbase/sink/CamelSinkCouchbaseITCase.java | 43 ++++++++------ .../sink/CamelSinkElasticSearchITCase.java | 7 --- .../file/sink/CamelSinkFileITCase.java | 27 +++++---- .../hdfs/sink/CamelSinkHDFSITCase.java | 25 ++++---- .../http/sink/CamelSinkHTTPITCase.java | 5 -- .../jdbc/sink/CamelSinkJDBCITCase.java | 41 +++++++------ .../mongodb/sink/CamelSinkMongoDBITCase.java | 25 ++++---- .../rabbitmq/sink/RabbitMQSinkITCase.java | 6 -- .../sjms2/sink/CamelSinkIdempotentJMSITCase.java | 4 -- .../sjms2/sink/CamelSinkJMSITCase.java | 6 -- .../sql/sink/CamelSinkSQLITCase.java | 40 ++++++++----- .../ssh/sink/CamelSinkSshITCase.java | 23 ++++---- .../syslog/sink/CamelSinkSyslogITCase.java | 24 ++++---- 30 files changed, 434 insertions(+), 340 deletions(-) diff --git a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sns/sink/CamelSinkAWSSNSITCase.java b/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sns/sink/CamelSinkAWSSNSITCase.java index 8d893c9..aea8c76 100644 --- a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sns/sink/CamelSinkAWSSNSITCase.java +++ b/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sns/sink/CamelSinkAWSSNSITCase.java @@ -18,7 +18,6 @@ package org.apache.camel.kafkaconnector.aws.v1.sns.sink; import java.util.List; -import java.util.Map; import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -81,11 +80,6 @@ public class CamelSinkAWSSNSITCase extends CamelSinkTestSupport { } @Override - protected Map<String, String> messageHeaders(String text, int current) { - return null; - } - - @Override protected void verifyMessages(CountDownLatch latch) throws InterruptedException { if (latch.await(120, TimeUnit.SECONDS)) { assertEquals(expect, received, diff --git a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sqs/sink/CamelSinkAWSSQSITCase.java b/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sqs/sink/CamelSinkAWSSQSITCase.java index b38441f..894114f 100644 --- a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sqs/sink/CamelSinkAWSSQSITCase.java +++ b/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sqs/sink/CamelSinkAWSSQSITCase.java @@ -18,7 +18,6 @@ package org.apache.camel.kafkaconnector.aws.v1.sqs.sink; import java.util.List; -import java.util.Map; import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -91,11 +90,6 @@ public class CamelSinkAWSSQSITCase extends CamelSinkTestSupport { } @Override - protected Map<String, String> messageHeaders(String text, int current) { - return null; - } - - @Override protected void verifyMessages(CountDownLatch latch) throws InterruptedException { if (latch.await(110, TimeUnit.SECONDS)) { assertEquals(expect, received, diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/cw/sink/CamelSinkAWSCWITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/cw/sink/CamelSinkAWSCWITCase.java index 9b27827..550ea47 100644 --- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/cw/sink/CamelSinkAWSCWITCase.java +++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/cw/sink/CamelSinkAWSCWITCase.java @@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit; import org.apache.camel.kafkaconnector.CamelSinkTask; import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport; +import org.apache.camel.kafkaconnector.common.test.StringMessageProducer; import org.apache.camel.kafkaconnector.common.utils.TestUtils; import org.apache.camel.test.infra.aws.common.services.AWSService; import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils; @@ -62,6 +63,23 @@ public class CamelSinkAWSCWITCase extends CamelSinkTestSupport { private volatile int received; private final int expect = 10; + private static class CustomProducer extends StringMessageProducer { + public CustomProducer(String bootstrapServer, String topicName, int count) { + super(bootstrapServer, topicName, count); + } + + @Override + public Map<String, String> messageHeaders(String text, int current) { + Map<String, String> headers = new HashMap<>(); + + headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsCwMetricDimensionName", + "test-dimension-" + current); + headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsCwMetricDimensionValue", String.valueOf(current)); + + return headers; + } + } + @Override protected String[] getConnectorsInTest() { return new String[] {"camel-aws2-cw-kafka-connector"}; @@ -78,17 +96,6 @@ public class CamelSinkAWSCWITCase extends CamelSinkTestSupport { } @Override - protected Map<String, String> messageHeaders(String text, int current) { - Map<String, String> headers = new HashMap<>(); - - headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsCwMetricDimensionName", - "test-dimension-" + current); - headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsCwMetricDimensionValue", String.valueOf(current)); - - return headers; - } - - @Override protected void consumeMessages(CountDownLatch latch) { try { ListMetricsRequest request = ListMetricsRequest.builder() @@ -134,23 +141,18 @@ public class CamelSinkAWSCWITCase extends CamelSinkTestSupport { @Test @Timeout(value = 120) - public void testBasicSendReceive() { - try { - Properties amazonProperties = awsService.getConnectionProperties(); - String topicName = TestUtils.getDefaultTestTopic(this.getClass()); - - ConnectorPropertyFactory testProperties = CamelAWSCWPropertyFactory - .basic() - .withTopics(topicName) - .withConfiguration(TestCloudWatchConfiguration.class.getName()) - .withAmazonConfig(amazonProperties) - .withName(metricName) - .withSinkPathNamespace(namespace); - - runTest(testProperties, topicName, expect); - } catch (Exception e) { - LOG.error("Amazon CloudWatch test failed: {}", e.getMessage(), e); - fail(e.getMessage()); - } + public void testBasicSendReceive() throws Exception { + Properties amazonProperties = awsService.getConnectionProperties(); + String topicName = TestUtils.getDefaultTestTopic(this.getClass()); + + ConnectorPropertyFactory testProperties = CamelAWSCWPropertyFactory + .basic() + .withTopics(topicName) + .withConfiguration(TestCloudWatchConfiguration.class.getName()) + .withAmazonConfig(amazonProperties) + .withName(metricName) + .withSinkPathNamespace(namespace); + + runTest(testProperties, new CustomProducer(getKafkaService().getBootstrapServers(), topicName, expect)); } } diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/ec2/sink/CamelSinkAWSEC2ITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/ec2/sink/CamelSinkAWSEC2ITCase.java index ee6e350..0c64bcf 100644 --- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/ec2/sink/CamelSinkAWSEC2ITCase.java +++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/ec2/sink/CamelSinkAWSEC2ITCase.java @@ -28,6 +28,7 @@ import org.apache.camel.kafkaconnector.CamelSinkTask; import org.apache.camel.kafkaconnector.aws.v2.cw.sink.TestCloudWatchConfiguration; import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport; +import org.apache.camel.kafkaconnector.common.test.StringMessageProducer; import org.apache.camel.kafkaconnector.common.utils.TestUtils; import org.apache.camel.test.infra.aws.common.services.AWSService; import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils; @@ -56,6 +57,26 @@ public class CamelSinkAWSEC2ITCase extends CamelSinkTestSupport { private volatile int received; private final int expect = 10; + private static class CustomProducer extends StringMessageProducer { + public CustomProducer(String bootstrapServer, String topicName, int count) { + super(bootstrapServer, topicName, count); + } + + @Override + public Map<String, String> messageHeaders(String text, int current) { + Map<String, String> headers = new HashMap<>(); + + headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsEC2ImageId", + "image-id-" + current); + headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsEC2InstanceType", "T1_MICRO"); + headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsEC2InstanceMinCount", "1"); + headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsEC2InstanceMaxCount", "1"); + headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsEC2InstanceSecurityGroups", "default"); + + return headers; + } + } + @Override protected String[] getConnectorsInTest() { return new String[] {"camel-aws2-ec2-kafka-connector"}; @@ -69,19 +90,6 @@ public class CamelSinkAWSEC2ITCase extends CamelSinkTestSupport { received = 0; } - @Override - protected Map<String, String> messageHeaders(String text, int current) { - Map<String, String> headers = new HashMap<>(); - - headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsEC2ImageId", - "image-id-" + current); - headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsEC2InstanceType", "T1_MICRO"); - headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsEC2InstanceMinCount", "1"); - headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsEC2InstanceMaxCount", "1"); - headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsEC2InstanceSecurityGroups", "default"); - - return headers; - } @Override protected void consumeMessages(CountDownLatch latch) { @@ -119,31 +127,24 @@ public class CamelSinkAWSEC2ITCase extends CamelSinkTestSupport { fail(String.format("Failed to receive the messages within the specified time: received %d of %d", received, expect)); } - - } @Test @Timeout(90) - public void testBasicSendReceive() { - try { - Properties amazonProperties = awsService.getConnectionProperties(); - String topicName = TestUtils.getDefaultTestTopic(this.getClass()); - - ConnectorPropertyFactory testProperties = CamelAWSEC2PropertyFactory - .basic() - .withTopics(topicName) - .withConfiguration(TestCloudWatchConfiguration.class.getName()) - .withAmazonConfig(amazonProperties) - .withSinkPathLabel(logicalName) - .withConfiguration(TestEC2Configuration.class.getName()) - .withSinkEndpointOperation("createAndRunInstances"); - - runTest(testProperties, topicName, expect); - } catch (Exception e) { - LOG.error("Amazon EC2 test failed: {}", e.getMessage(), e); - fail(e.getMessage()); - } + public void testBasicSendReceive() throws Exception { + Properties amazonProperties = awsService.getConnectionProperties(); + String topicName = TestUtils.getDefaultTestTopic(this.getClass()); + + ConnectorPropertyFactory testProperties = CamelAWSEC2PropertyFactory + .basic() + .withTopics(topicName) + .withConfiguration(TestCloudWatchConfiguration.class.getName()) + .withAmazonConfig(amazonProperties) + .withSinkPathLabel(logicalName) + .withConfiguration(TestEC2Configuration.class.getName()) + .withSinkEndpointOperation("createAndRunInstances"); + + runTest(testProperties, new CustomProducer(getKafkaService().getBootstrapServers(), topicName, expect)); } diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/iam/sink/CamelSinkAWSIAMITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/iam/sink/CamelSinkAWSIAMITCase.java index 7c212bc..f88e078 100644 --- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/iam/sink/CamelSinkAWSIAMITCase.java +++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/iam/sink/CamelSinkAWSIAMITCase.java @@ -28,6 +28,7 @@ import org.apache.camel.kafkaconnector.CamelSinkTask; import org.apache.camel.kafkaconnector.aws.v2.cw.sink.TestCloudWatchConfiguration; import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport; +import org.apache.camel.kafkaconnector.common.test.StringMessageProducer; import org.apache.camel.kafkaconnector.common.utils.TestUtils; import org.apache.camel.test.infra.aws.common.services.AWSService; import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils; @@ -56,14 +57,20 @@ public class CamelSinkAWSIAMITCase extends CamelSinkTestSupport { private volatile int received; private final int expect = 10; - @Override - protected Map<String, String> messageHeaders(String text, int current) { - Map<String, String> headers = new HashMap<>(); + private static class CustomProducer extends StringMessageProducer { + public CustomProducer(String bootstrapServer, String topicName, int count) { + super(bootstrapServer, topicName, count); + } + + @Override + public Map<String, String> messageHeaders(String text, int current) { + Map<String, String> headers = new HashMap<>(); - headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsIAMUsername", - "username-" + current); + headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsIAMUsername", + "username-" + current); - return headers; + return headers; + } } @Override @@ -117,24 +124,19 @@ public class CamelSinkAWSIAMITCase extends CamelSinkTestSupport { @Test @Timeout(90) - public void testBasicSendReceive() { - try { - Properties amazonProperties = awsService.getConnectionProperties(); - String topicName = TestUtils.getDefaultTestTopic(this.getClass()); - - ConnectorPropertyFactory testProperties = CamelAWSIAMPropertyFactory - .basic() - .withTopics(topicName) - .withConfiguration(TestCloudWatchConfiguration.class.getName()) - .withAmazonConfig(amazonProperties) - .withSinkPathLabel(logicalName) - .withConfiguration(TestIAMConfiguration.class.getName()) - .withSinkEndpointOperation("createUser"); - - runTest(testProperties, topicName, expect); - } catch (Exception e) { - LOG.error("Amazon IAM test failed: {}", e.getMessage(), e); - fail(e.getMessage()); - } + public void testBasicSendReceive() throws Exception { + Properties amazonProperties = awsService.getConnectionProperties(); + String topicName = TestUtils.getDefaultTestTopic(this.getClass()); + + ConnectorPropertyFactory testProperties = CamelAWSIAMPropertyFactory + .basic() + .withTopics(topicName) + .withConfiguration(TestCloudWatchConfiguration.class.getName()) + .withAmazonConfig(amazonProperties) + .withSinkPathLabel(logicalName) + .withConfiguration(TestIAMConfiguration.class.getName()) + .withSinkEndpointOperation("createUser"); + + runTest(testProperties, new CustomProducer(getKafkaService().getBootstrapServers(), topicName, expect)); } } diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/sink/CamelSinkAWSKinesisITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/sink/CamelSinkAWSKinesisITCase.java index b975a3d..9ca84ef 100644 --- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/sink/CamelSinkAWSKinesisITCase.java +++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/sink/CamelSinkAWSKinesisITCase.java @@ -29,6 +29,7 @@ import org.apache.camel.kafkaconnector.aws.v2.kinesis.common.KinesisUtils; import org.apache.camel.kafkaconnector.aws.v2.kinesis.common.TestKinesisConfiguration; import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport; +import org.apache.camel.kafkaconnector.common.test.StringMessageProducer; import org.apache.camel.kafkaconnector.common.utils.TestUtils; import org.apache.camel.test.infra.aws.common.AWSCommon; import org.apache.camel.test.infra.aws.common.services.AWSService; @@ -64,14 +65,20 @@ public class CamelSinkAWSKinesisITCase extends CamelSinkTestSupport { private volatile int received; private final int expect = 10; - @Override - protected Map<String, String> messageHeaders(String text, int current) { - Map<String, String> headers = new HashMap<>(); + private static class CustomProducer extends StringMessageProducer { + public CustomProducer(String bootstrapServer, String topicName, int count) { + super(bootstrapServer, topicName, count); + } - headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsKinesisPartitionKey", - "partition-" + current); + @Override + public Map<String, String> messageHeaders(String text, int current) { + Map<String, String> headers = new HashMap<>(); - return headers; + headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsKinesisPartitionKey", + "partition-" + current); + + return headers; + } } @Override @@ -143,6 +150,6 @@ public class CamelSinkAWSKinesisITCase extends CamelSinkTestSupport { .withConfiguration(TestKinesisConfiguration.class.getName()) .withStreamName(streamName); - runTest(connectorPropertyFactory, topicName, expect); + runTest(connectorPropertyFactory, new CustomProducer(getKafkaService().getBootstrapServers(), topicName, expect)); } } diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kms/sink/CamelSinkAWSKMSITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kms/sink/CamelSinkAWSKMSITCase.java index 4f57799..f58c54a 100644 --- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kms/sink/CamelSinkAWSKMSITCase.java +++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kms/sink/CamelSinkAWSKMSITCase.java @@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit; import org.apache.camel.kafkaconnector.CamelSinkTask; import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport; +import org.apache.camel.kafkaconnector.common.test.StringMessageProducer; import org.apache.camel.kafkaconnector.common.utils.TestUtils; import org.apache.camel.test.infra.aws.common.services.AWSService; import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils; @@ -62,18 +63,24 @@ public class CamelSinkAWSKMSITCase extends CamelSinkTestSupport { private volatile int received; private final int expect = 10; - @Override - protected Map<String, String> messageHeaders(String text, int current) { - Map<String, String> headers = new HashMap<>(); + private static class CustomProducer extends StringMessageProducer { + public CustomProducer(String bootstrapServer, String topicName, int count) { + super(bootstrapServer, topicName, count); + } - headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsKMSKeyId", - String.valueOf(current)); + @Override + public Map<String, String> messageHeaders(String text, int current) { + Map<String, String> headers = new HashMap<>(); - headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsKMSDescription", - "test key " + current); + headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsKMSKeyId", + String.valueOf(current)); + headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsKMSDescription", + "test key " + current); - return headers; + + return headers; + } } @Override @@ -146,6 +153,6 @@ public class CamelSinkAWSKMSITCase extends CamelSinkTestSupport { .withSinkEndpointOperation("createKey") .withSinkPathLabel(logicalName); - runTest(connectorPropertyFactory, topicName, expect); + runTest(connectorPropertyFactory, new CustomProducer(getKafkaService().getBootstrapServers(), topicName, expect)); } } diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/sink/CamelSinkAWSS3ITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/sink/CamelSinkAWSS3ITCase.java index ea77a09..1311507 100644 --- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/sink/CamelSinkAWSS3ITCase.java +++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/sink/CamelSinkAWSS3ITCase.java @@ -29,6 +29,7 @@ import org.apache.camel.kafkaconnector.aws.v2.s3.common.S3Utils; import org.apache.camel.kafkaconnector.aws.v2.s3.common.TestS3Configuration; import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport; +import org.apache.camel.kafkaconnector.common.test.StringMessageProducer; import org.apache.camel.kafkaconnector.common.utils.TestUtils; import org.apache.camel.test.infra.aws.common.AWSCommon; import org.apache.camel.test.infra.aws.common.services.AWSService; @@ -59,18 +60,25 @@ public class CamelSinkAWSS3ITCase extends CamelSinkTestSupport { private volatile int received; private int expect = 10; - @Override - protected Map<String, String> messageHeaders(String text, int current) { - Map<String, String> headers = new HashMap<>(); + private class CustomProducer extends StringMessageProducer { + public CustomProducer(String bootstrapServer, String topicName, int count) { + super(bootstrapServer, topicName, count); + } + + @Override + public Map<String, String> messageHeaders(String text, int current) { + Map<String, String> headers = new HashMap<>(); - headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsS3Key", - "file" + current + ".txt"); - headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsS3BucketName", - bucketName); + headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsS3Key", + "file" + current + ".txt"); + headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsS3BucketName", + bucketName); - return headers; + return headers; + } } + @Override protected void consumeMessages(CountDownLatch latch) { try { @@ -147,6 +155,6 @@ public class CamelSinkAWSS3ITCase extends CamelSinkTestSupport { .withBucketNameOrArn(bucketName) .withAutoCreateBucket(true); - runTest(testProperties, topicName, expect); + runTest(testProperties, new CustomProducer(getKafkaService().getBootstrapServers(), topicName, expect)); } } diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/sink/CamelSinkAWSSQSITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/sink/CamelSinkAWSSQSITCase.java index 85b305b..c686377 100644 --- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/sink/CamelSinkAWSSQSITCase.java +++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/sink/CamelSinkAWSSQSITCase.java @@ -18,7 +18,6 @@ package org.apache.camel.kafkaconnector.aws.v2.sqs.sink; import java.util.List; -import java.util.Map; import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -88,10 +87,6 @@ public class CamelSinkAWSSQSITCase extends CamelSinkTestSupport { } } - @Override - protected Map<String, String> messageHeaders(String text, int current) { - return null; - } @Override protected void verifyMessages(CountDownLatch latch) throws InterruptedException { diff --git a/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/blob/sink/CamelSinkAzureStorageBlobITCase.java b/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/blob/sink/CamelSinkAzureStorageBlobITCase.java index 1bbf9f1..727d3fb 100644 --- a/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/blob/sink/CamelSinkAzureStorageBlobITCase.java +++ b/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/blob/sink/CamelSinkAzureStorageBlobITCase.java @@ -30,6 +30,7 @@ import com.azure.storage.blob.models.BlobItem; import org.apache.camel.kafkaconnector.CamelSinkTask; import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport; +import org.apache.camel.kafkaconnector.common.test.StringMessageProducer; import org.apache.camel.kafkaconnector.common.utils.TestUtils; import org.apache.camel.test.infra.azure.common.AzureCredentialsHolder; import org.apache.camel.test.infra.azure.common.services.AzureService; @@ -62,6 +63,30 @@ public class CamelSinkAzureStorageBlobITCase extends CamelSinkTestSupport { private int expect = 10; private int received; + private class CustomProducer extends StringMessageProducer { + public CustomProducer(String bootstrapServer, String topicName, int count) { + super(bootstrapServer, topicName, count); + } + + @Override + public String testMessageContent(int current) { + return "test " + current + " data"; + } + + @Override + public Map<String, String> messageHeaders(String text, int current) { + Map<String, String> messageParameters = new HashMap<>(); + + String sentFile = "test " + current; + + sentData.put(sentFile, testMessageContent(current)); + + messageParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAzureStorageBlobBlobName", sentFile); + + return messageParameters; + } + } + @Override protected String[] getConnectorsInTest() { return new String[]{"camel-azure-storage-blob-kafka-connector"}; @@ -84,24 +109,6 @@ public class CamelSinkAzureStorageBlobITCase extends CamelSinkTestSupport { } @Override - protected String testMessageContent(int current) { - return "test " + current + " data"; - } - - @Override - protected Map<String, String> messageHeaders(String text, int current) { - Map<String, String> messageParameters = new HashMap<>(); - - String sentFile = "test " + current; - - sentData.put(sentFile, testMessageContent(current)); - - messageParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAzureStorageBlobBlobName", sentFile); - - return messageParameters; - } - - @Override protected void consumeMessages(CountDownLatch latch) { try { consume(); @@ -163,7 +170,7 @@ public class CamelSinkAzureStorageBlobITCase extends CamelSinkTestSupport { .withContainerName(blobContainerName) .withOperation("uploadBlockBlob"); - runTest(factory, topicName, expect); + runTest(factory, new CustomProducer(getKafkaService().getBootstrapServers(), topicName, expect)); } @Test @@ -180,6 +187,6 @@ public class CamelSinkAzureStorageBlobITCase extends CamelSinkTestSupport { .append("operation", "uploadBlockBlob") .buildUrl(); - runTest(factory, topicName, expect); + runTest(factory, new CustomProducer(getKafkaService().getBootstrapServers(), topicName, expect)); } } diff --git a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/CamelSinkAzureStorageQueueITCase.java b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/CamelSinkAzureStorageQueueITCase.java index d447703..ef12c18 100644 --- a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/CamelSinkAzureStorageQueueITCase.java +++ b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/CamelSinkAzureStorageQueueITCase.java @@ -17,7 +17,6 @@ package org.apache.camel.kafkaconnector.azure.storage.queue.sink; -import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -82,11 +81,6 @@ public class CamelSinkAzureStorageQueueITCase extends CamelSinkTestSupport { } @Override - protected Map<String, String> messageHeaders(String text, int current) { - return null; - } - - @Override protected void consumeMessages(CountDownLatch latch) { try { consume(); diff --git a/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/sink/CamelSinkCassandraITCase.java b/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/sink/CamelSinkCassandraITCase.java index 2949fff..b88946e 100644 --- a/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/sink/CamelSinkCassandraITCase.java +++ b/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/sink/CamelSinkCassandraITCase.java @@ -17,7 +17,6 @@ package org.apache.camel.kafkaconnector.cassandra.sink; -import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -85,11 +84,6 @@ public class CamelSinkCassandraITCase extends CamelSinkTestSupport { } @Override - protected Map<String, String> messageHeaders(String text, int current) { - return null; - } - - @Override protected void consumeMessages(CountDownLatch latch) { try { if (!TestUtils.waitFor(testDataDao::hasEnoughData, (long) expect)) { diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/AbstractTestMessageProducer.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/AbstractTestMessageProducer.java new file mode 100644 index 0000000..28d3d0d --- /dev/null +++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/AbstractTestMessageProducer.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.common.test; + +import java.util.Map; +import java.util.concurrent.ExecutionException; + +import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class AbstractTestMessageProducer<T> implements TestMessageProducer<T> { + private static final Logger LOG = LoggerFactory.getLogger(AbstractTestMessageProducer.class); + + private final KafkaClient<String, T> kafkaClient; + private final String topicName; + private final int count; + + public AbstractTestMessageProducer(KafkaClient<String, T> kafkaClient, String topicName, int count) { + this.kafkaClient = kafkaClient; + this.topicName = topicName; + this.count = count; + } + + public AbstractTestMessageProducer(String bootstrapServer, String topicName, int count) { + this.kafkaClient = new KafkaClient<>(bootstrapServer); + this.topicName = topicName; + this.count = count; + } + + public void produceMessages() throws ExecutionException, InterruptedException { + LOG.trace("Producing messages ..."); + for (int i = 0; i < count; i++) { + T message = testMessageContent(i); + Map<String, String> headers = messageHeaders(message, i); + + if (headers == null) { + kafkaClient.produce(topicName, message); + } else { + kafkaClient.produce(topicName, message, headers); + } + } + } +} diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSinkTestSupport.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSinkTestSupport.java index bd02eef..b414726 100644 --- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSinkTestSupport.java +++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSinkTestSupport.java @@ -18,7 +18,6 @@ package org.apache.camel.kafkaconnector.common.test; import java.time.Duration; -import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -26,41 +25,12 @@ import java.util.concurrent.Executors; import org.apache.camel.kafkaconnector.common.AbstractKafkaTest; import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; -import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.junit.jupiter.api.Assertions.fail; - public abstract class CamelSinkTestSupport extends AbstractKafkaTest { private static final Logger LOG = LoggerFactory.getLogger(CamelSinkTestSupport.class); - protected abstract Map<String, String> messageHeaders(String text, int current); - - protected String testMessageContent(int current) { - return "Sink test message " + current; - } - - protected void produceMessages(String topicName, int count) { - try { - KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers()); - - for (int i = 0; i < count; i++) { - String message = testMessageContent(i); - Map<String, String> headers = messageHeaders(message, i); - - if (headers == null) { - kafkaClient.produce(topicName, message); - } else { - kafkaClient.produce(topicName, message, headers); - } - } - } catch (Throwable t) { - LOG.error("Unable to publish messages to the broker: {}", t.getMessage(), t); - fail(String.format("Unable to publish messages to the broker: %s", t.getMessage())); - } - } - /** * A simple test runner that follows the steps: initialize, start consumer, produce messages, verify results * @@ -70,7 +40,35 @@ public abstract class CamelSinkTestSupport extends AbstractKafkaTest { * @throws Exception For test-specific exceptions */ protected void runTest(ConnectorPropertyFactory connectorPropertyFactory, String topic, int count) throws Exception { - runTest(connectorPropertyFactory, () -> produceMessages(topic, count)); + StringMessageProducer stringMessageProducer = new StringMessageProducer(getKafkaService().getBootstrapServers(), + topic, count); + + runTest(connectorPropertyFactory, stringMessageProducer); + } + + /** + * A simple test runner that follows the steps: initialize, start consumer, produce messages, verify results + * + * @param connectorPropertyFactory A factory for connector properties + * @throws Exception For test-specific exceptions + */ + protected void runTest(ConnectorPropertyFactory connectorPropertyFactory, TestMessageProducer producer) throws Exception { + connectorPropertyFactory.log(); + getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1); + + LOG.debug("Creating the consumer ..."); + ExecutorService service = Executors.newCachedThreadPool(); + + CountDownLatch latch = new CountDownLatch(1); + service.submit(() -> consumeMessages(latch)); + + producer.produceMessages(); + + LOG.debug("Waiting for the messages to be processed"); + service.shutdown(); + + LOG.debug("Waiting for the test to complete"); + verifyMessages(latch); } /** @@ -80,7 +78,7 @@ public abstract class CamelSinkTestSupport extends AbstractKafkaTest { * @throws ExecutionException * @throws InterruptedException */ - protected void runTest(ConnectorPropertyFactory connectorPropertyFactory, TestMessageProducer producer) throws ExecutionException, InterruptedException { + protected void runTest(ConnectorPropertyFactory connectorPropertyFactory, FunctionalTestMessageProducer producer) throws ExecutionException, InterruptedException { connectorPropertyFactory.log(); getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1); @@ -90,7 +88,7 @@ public abstract class CamelSinkTestSupport extends AbstractKafkaTest { CountDownLatch latch = new CountDownLatch(1); service.submit(() -> consumeMessages(latch)); - producer.producerMessages(); + producer.produceMessages(); LOG.debug("Waiting for the messages to be processed"); service.shutdown(); diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/TestMessageProducer.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/FunctionalTestMessageProducer.java similarity index 91% copy from tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/TestMessageProducer.java copy to tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/FunctionalTestMessageProducer.java index dedcf97..794fefa 100644 --- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/TestMessageProducer.java +++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/FunctionalTestMessageProducer.java @@ -18,6 +18,6 @@ package org.apache.camel.kafkaconnector.common.test; @FunctionalInterface -public interface TestMessageProducer { - void producerMessages(); +public interface FunctionalTestMessageProducer { + void produceMessages(); } diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/TestMessageProducer.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/StringMessageProducer.java similarity index 51% copy from tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/TestMessageProducer.java copy to tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/StringMessageProducer.java index dedcf97..c25d2b5 100644 --- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/TestMessageProducer.java +++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/StringMessageProducer.java @@ -17,7 +17,30 @@ package org.apache.camel.kafkaconnector.common.test; -@FunctionalInterface -public interface TestMessageProducer { - void producerMessages(); +import java.util.Map; + +import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient; + +/** + * A producer that sends the 'count' amount of text messages to the Kafka broker + */ +public class StringMessageProducer extends AbstractTestMessageProducer<String> { + + public StringMessageProducer(String bootStrapServer, String topicName, int count) { + super(bootStrapServer, topicName, count); + } + + public StringMessageProducer(KafkaClient<String, String> kafkaClient, String topicName, int count) { + super(kafkaClient, topicName, count); + } + + @Override + public Map<String, String> messageHeaders(String text, int current) { + return null; + } + + @Override + public String testMessageContent(int current) { + return "Sink test message " + current; + } } diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/TestMessageProducer.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/TestMessageProducer.java index dedcf97..ff69c5a 100644 --- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/TestMessageProducer.java +++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/TestMessageProducer.java @@ -17,7 +17,14 @@ package org.apache.camel.kafkaconnector.common.test; -@FunctionalInterface -public interface TestMessageProducer { - void producerMessages(); +import java.util.Map; +import java.util.concurrent.ExecutionException; + +/** + * A producer of test messages + */ +public interface TestMessageProducer<T> { + Map<String, String> messageHeaders(T text, int current); + T testMessageContent(int current); + void produceMessages() throws ExecutionException, InterruptedException; } diff --git a/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/sink/CamelSinkCouchbaseITCase.java b/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/sink/CamelSinkCouchbaseITCase.java index 15104ac..46bf50d 100644 --- a/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/sink/CamelSinkCouchbaseITCase.java +++ b/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/sink/CamelSinkCouchbaseITCase.java @@ -34,6 +34,7 @@ import com.couchbase.client.java.query.QueryResult; import org.apache.camel.kafkaconnector.CamelSinkTask; import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport; +import org.apache.camel.kafkaconnector.common.test.StringMessageProducer; import org.apache.camel.kafkaconnector.common.utils.TestUtils; import org.apache.camel.test.infra.couchbase.services.CouchbaseService; import org.apache.camel.test.infra.couchbase.services.CouchbaseServiceFactory; @@ -74,6 +75,28 @@ public class CamelSinkCouchbaseITCase extends CamelSinkTestSupport { private final int expect = 10; + private static class CustomProducer extends StringMessageProducer { + public CustomProducer(String bootstrapServer, String topicName, int count) { + super(bootstrapServer, topicName, count); + } + + @Override + public String testMessageContent(int current) { + JsonObject jsonObject = JsonObject.create().put("data", String.format("test-%d", current)); + + return jsonObject.toString(); + } + + @Override + public Map<String, String> messageHeaders(String text, int current) { + Map<String, String> parameters = new HashMap<>(); + + parameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CCB_ID", String.valueOf(current)); + + return parameters; + } + } + @Override protected String[] getConnectorsInTest() { return new String[] {"camel-couchbase-kafka-connector"}; @@ -116,22 +139,6 @@ public class CamelSinkCouchbaseITCase extends CamelSinkTestSupport { } @Override - protected String testMessageContent(int current) { - JsonObject jsonObject = JsonObject.create().put("data", String.format("test-%d", current)); - - return jsonObject.toString(); - } - - @Override - protected Map<String, String> messageHeaders(String text, int current) { - Map<String, String> parameters = new HashMap<>(); - - parameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CCB_ID", String.valueOf(current)); - - return parameters; - } - - @Override protected void consumeMessages(CountDownLatch latch) { try { TestUtils.waitFor(this::waitForMinimumRecordCount); @@ -210,7 +217,7 @@ public class CamelSinkCouchbaseITCase extends CamelSinkTestSupport { .withUsername(service.getUsername()) .withPassword(service.getPassword()); - runTest(factory, topic, expect); + runTest(factory, new CustomProducer(getKafkaService().getBootstrapServers(), topic, expect)); } @RepeatedTest(10) @@ -229,6 +236,6 @@ public class CamelSinkCouchbaseITCase extends CamelSinkTestSupport { .buildUrl(); - runTest(factory, topic, expect); + runTest(factory, new CustomProducer(getKafkaService().getBootstrapServers(), topic, expect)); } } diff --git a/tests/itests-elasticsearch/src/test/java/org/apache/camel/kafkaconnector/elasticsearch/sink/CamelSinkElasticSearchITCase.java b/tests/itests-elasticsearch/src/test/java/org/apache/camel/kafkaconnector/elasticsearch/sink/CamelSinkElasticSearchITCase.java index c80f892..bc12b51 100644 --- a/tests/itests-elasticsearch/src/test/java/org/apache/camel/kafkaconnector/elasticsearch/sink/CamelSinkElasticSearchITCase.java +++ b/tests/itests-elasticsearch/src/test/java/org/apache/camel/kafkaconnector/elasticsearch/sink/CamelSinkElasticSearchITCase.java @@ -17,7 +17,6 @@ package org.apache.camel.kafkaconnector.elasticsearch.sink; -import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -74,11 +73,6 @@ public class CamelSinkElasticSearchITCase extends CamelSinkTestSupport { } @Override - protected Map<String, String> messageHeaders(String text, int current) { - return null; - } - - @Override protected void consumeMessages(CountDownLatch latch) { try { client.waitForIndex(); @@ -88,7 +82,6 @@ public class CamelSinkElasticSearchITCase extends CamelSinkTestSupport { } finally { latch.countDown(); } - } @Override diff --git a/tests/itests-file/src/test/java/org/apache/camel/kafkaconnector/file/sink/CamelSinkFileITCase.java b/tests/itests-file/src/test/java/org/apache/camel/kafkaconnector/file/sink/CamelSinkFileITCase.java index ead6c58..0e19e45 100644 --- a/tests/itests-file/src/test/java/org/apache/camel/kafkaconnector/file/sink/CamelSinkFileITCase.java +++ b/tests/itests-file/src/test/java/org/apache/camel/kafkaconnector/file/sink/CamelSinkFileITCase.java @@ -27,12 +27,12 @@ import java.nio.file.StandardWatchEventKinds; import java.nio.file.WatchEvent; import java.nio.file.WatchKey; import java.nio.file.WatchService; -import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport; +import org.apache.camel.kafkaconnector.common.test.StringMessageProducer; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -57,6 +57,17 @@ public class CamelSinkFileITCase extends CamelSinkTestSupport { private String topicName; private final int expect = 1; + private static class CustomProducer extends StringMessageProducer { + public CustomProducer(String bootstrapServer, String topicName, int count) { + super(bootstrapServer, topicName, count); + } + + @Override + public String testMessageContent(int current) { + return "test"; + } + } + @Override protected String[] getConnectorsInTest() { return new String[] {"camel-file-kafka-connector"}; @@ -81,16 +92,6 @@ public class CamelSinkFileITCase extends CamelSinkTestSupport { } @Override - protected String testMessageContent(int current) { - return "test"; - } - - @Override - protected Map<String, String> messageHeaders(String text, int current) { - return null; - } - - @Override protected void consumeMessages(CountDownLatch latch) { try { File sinkFile = new File(SINK_DIR, FILENAME); @@ -195,7 +196,7 @@ public class CamelSinkFileITCase extends CamelSinkTestSupport { .withFileName(FILENAME) .withDoneFileName(FILENAME + ".done"); - runTest(connectorPropertyFactory, topicName, expect); + runTest(connectorPropertyFactory, new CustomProducer(getKafkaService().getBootstrapServers(), topicName, expect)); } @Test @@ -208,6 +209,6 @@ public class CamelSinkFileITCase extends CamelSinkTestSupport { .append("doneFileName", FILENAME + ".done") .buildUrl(); - runTest(connectorPropertyFactory, topicName, expect); + runTest(connectorPropertyFactory, new CustomProducer(getKafkaService().getBootstrapServers(), topicName, expect)); } } diff --git a/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/sink/CamelSinkHDFSITCase.java b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/sink/CamelSinkHDFSITCase.java index c7e7cc3..f12f310 100644 --- a/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/sink/CamelSinkHDFSITCase.java +++ b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/sink/CamelSinkHDFSITCase.java @@ -19,12 +19,12 @@ package org.apache.camel.kafkaconnector.hdfs.sink; import java.io.IOException; import java.net.URISyntaxException; -import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport; +import org.apache.camel.kafkaconnector.common.test.StringMessageProducer; import org.apache.camel.kafkaconnector.common.utils.TestUtils; import org.apache.camel.kafkaconnector.hdfs.utils.HDFSEasy; import org.apache.camel.test.infra.hdfs.v2.services.HDFSService; @@ -57,6 +57,17 @@ public class CamelSinkHDFSITCase extends CamelSinkTestSupport { private final int expect = 10; + private static class CustomProducer extends StringMessageProducer { + public CustomProducer(String bootstrapServer, String topicName, int count) { + super(bootstrapServer, topicName, count); + } + + @Override + public String testMessageContent(int current) { + return "Sink test message: " + current; + } + } + @Override protected String[] getConnectorsInTest() { return new String[] {"camel-hdfs-kafka-connector"}; @@ -84,16 +95,6 @@ public class CamelSinkHDFSITCase extends CamelSinkTestSupport { } @Override - protected String testMessageContent(int current) { - return "Sink test message: " + current; - } - - @Override - protected Map<String, String> messageHeaders(String text, int current) { - return null; - } - - @Override protected void consumeMessages(CountDownLatch latch) { try { TestUtils.waitFor(this::filesCreated); @@ -153,6 +154,6 @@ public class CamelSinkHDFSITCase extends CamelSinkTestSupport { .withPath(currentBasePath.getName()) .withSplitStrategy("MESSAGES:1,IDLE:1000"); - runTest(connectorPropertyFactory, topicName, expect); + runTest(connectorPropertyFactory, new CustomProducer(getKafkaService().getBootstrapServers(), topicName, expect)); } } diff --git a/tests/itests-http/src/test/java/org/apache/camel/kafkaconnector/http/sink/CamelSinkHTTPITCase.java b/tests/itests-http/src/test/java/org/apache/camel/kafkaconnector/http/sink/CamelSinkHTTPITCase.java index ea5d2db..33bd066 100644 --- a/tests/itests-http/src/test/java/org/apache/camel/kafkaconnector/http/sink/CamelSinkHTTPITCase.java +++ b/tests/itests-http/src/test/java/org/apache/camel/kafkaconnector/http/sink/CamelSinkHTTPITCase.java @@ -20,7 +20,6 @@ package org.apache.camel.kafkaconnector.http.sink; import java.io.IOException; import java.net.InetAddress; import java.util.List; -import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -85,10 +84,6 @@ public class CamelSinkHTTPITCase extends CamelSinkTestSupport { } } - @Override - protected Map<String, String> messageHeaders(String text, int current) { - return null; - } @Override protected void consumeMessages(CountDownLatch latch) { diff --git a/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/sink/CamelSinkJDBCITCase.java b/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/sink/CamelSinkJDBCITCase.java index 3663890..f5ed6ed 100644 --- a/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/sink/CamelSinkJDBCITCase.java +++ b/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/sink/CamelSinkJDBCITCase.java @@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit; import org.apache.camel.kafkaconnector.CamelSinkTask; import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport; +import org.apache.camel.kafkaconnector.common.test.StringMessageProducer; import org.apache.camel.kafkaconnector.common.utils.TestUtils; import org.apache.camel.kafkaconnector.jdbc.client.DatabaseClient; import org.apache.camel.kafkaconnector.jdbc.services.TestDataSource; @@ -73,6 +74,28 @@ public class CamelSinkJDBCITCase extends CamelSinkTestSupport { .build(); } + private static class CustomProducer extends StringMessageProducer { + public CustomProducer(String bootstrapServer, String topicName, int count) { + super(bootstrapServer, topicName, count); + } + + @Override + public String testMessageContent(int current) { + return "insert into test(test_name, test_data) values(:?TestName, :?TestData)"; + } + + @Override + public Map<String, String> messageHeaders(String text, int current) { + Map<String, String> jdbcParameters = new HashMap<>(); + + // The prefix 'CamelHeader' is removed by the SinkTask + jdbcParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "TestName", "SomeName" + TestUtils.randomWithRange(0, 100)); + jdbcParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "TestData", "test data " + current); + + return jdbcParameters; + } + } + @BeforeEach public void setUp() throws SQLException { topicName = getTopicForTest(this); @@ -86,22 +109,6 @@ public class CamelSinkJDBCITCase extends CamelSinkTestSupport { } @Override - protected String testMessageContent(int current) { - return "insert into test(test_name, test_data) values(:?TestName, :?TestData)"; - } - - @Override - protected Map<String, String> messageHeaders(String text, int current) { - Map<String, String> jdbcParameters = new HashMap<>(); - - // The prefix 'CamelHeader' is removed by the SinkTask - jdbcParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "TestName", "SomeName" + TestUtils.randomWithRange(0, 100)); - jdbcParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "TestData", "test data " + current); - - return jdbcParameters; - } - - @Override protected void consumeMessages(CountDownLatch latch) { try { LOG.debug("Waiting for indices"); @@ -159,6 +166,6 @@ public class CamelSinkJDBCITCase extends CamelSinkTestSupport { .withUseHeaderAsParameters(true) .withTopics(topicName); - runTest(factory, topicName, expect); + runTest(factory, new CustomProducer(getKafkaService().getBootstrapServers(), topicName, expect)); } } diff --git a/tests/itests-mongodb/src/test/java/org/apache/camel/kafkaconnector/mongodb/sink/CamelSinkMongoDBITCase.java b/tests/itests-mongodb/src/test/java/org/apache/camel/kafkaconnector/mongodb/sink/CamelSinkMongoDBITCase.java index da1b02a..29928af 100644 --- a/tests/itests-mongodb/src/test/java/org/apache/camel/kafkaconnector/mongodb/sink/CamelSinkMongoDBITCase.java +++ b/tests/itests-mongodb/src/test/java/org/apache/camel/kafkaconnector/mongodb/sink/CamelSinkMongoDBITCase.java @@ -17,7 +17,6 @@ package org.apache.camel.kafkaconnector.mongodb.sink; -import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -27,6 +26,7 @@ import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; import org.apache.camel.kafkaconnector.common.BasicConnectorPropertyFactory; import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport; +import org.apache.camel.kafkaconnector.common.test.StringMessageProducer; import org.apache.camel.kafkaconnector.common.utils.TestUtils; import org.apache.camel.test.infra.mongodb.services.MongoDBService; import org.apache.camel.test.infra.mongodb.services.MongoDBServiceFactory; @@ -56,6 +56,17 @@ public class CamelSinkMongoDBITCase extends CamelSinkTestSupport { private final int expect = 10; + private static class CustomProducer extends StringMessageProducer { + public CustomProducer(String bootstrapServer, String topicName, int count) { + super(bootstrapServer, topicName, count); + } + + @Override + public String testMessageContent(int current) { + return String.format("{\"test\": \"value %d\"}", current); + } + } + @Override protected String[] getConnectorsInTest() { return new String[]{"camel-mongodb-kafka-connector"}; @@ -69,16 +80,6 @@ public class CamelSinkMongoDBITCase extends CamelSinkTestSupport { } @Override - protected String testMessageContent(int current) { - return String.format("{\"test\": \"value %d\"}", current); - } - - @Override - protected Map<String, String> messageHeaders(String text, int current) { - return null; - } - - @Override protected void consumeMessages(CountDownLatch latch) { try { MongoDatabase mongoDatabase = mongoClient.getDatabase(databaseName); @@ -127,6 +128,6 @@ public class CamelSinkMongoDBITCase extends CamelSinkTestSupport { .withCollection("testRecords") .withOperation("insert"); - runTest(factory, topicName, expect); + runTest(factory, new CustomProducer(getKafkaService().getBootstrapServers(), topicName, expect)); } } diff --git a/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/sink/RabbitMQSinkITCase.java b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/sink/RabbitMQSinkITCase.java index 01ad213..82b97c0 100644 --- a/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/sink/RabbitMQSinkITCase.java +++ b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/sink/RabbitMQSinkITCase.java @@ -17,7 +17,6 @@ package org.apache.camel.kafkaconnector.rabbitmq.sink; import java.nio.charset.StandardCharsets; -import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -74,11 +73,6 @@ public class RabbitMQSinkITCase extends CamelSinkTestSupport { } @Override - protected Map<String, String> messageHeaders(String text, int current) { - return null; - } - - @Override protected void consumeMessages(CountDownLatch latch) { DeliverCallback deliveryCallback = (consumerTag, delivery) -> { if (!this.checkRecord(delivery)) { diff --git a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java index 432a20a..0b8bb52 100644 --- a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java +++ b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java @@ -90,10 +90,6 @@ public class CamelSinkIdempotentJMSITCase extends CamelSinkTestSupport { destinationName = SJMS2Common.DEFAULT_JMS_QUEUE + "-" + TestUtils.randomWithRange(0, 100); } - @Override - protected Map<String, String> messageHeaders(String text, int current) { - return null; - } @Override protected void consumeMessages(CountDownLatch latch) { diff --git a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkJMSITCase.java b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkJMSITCase.java index 5e9b66d..50dabe6 100644 --- a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkJMSITCase.java +++ b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkJMSITCase.java @@ -17,7 +17,6 @@ package org.apache.camel.kafkaconnector.sjms2.sink; -import java.util.Map; import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -85,11 +84,6 @@ public class CamelSinkJMSITCase extends CamelSinkTestSupport { } @Override - protected Map<String, String> messageHeaders(String text, int current) { - return null; - } - - @Override protected void verifyMessages(CountDownLatch latch) throws InterruptedException { if (latch.await(35, TimeUnit.SECONDS)) { assertEquals(received, expect, "Didn't process the expected amount of messages: " + received + " != " + expect); diff --git a/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/sink/CamelSinkSQLITCase.java b/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/sink/CamelSinkSQLITCase.java index 79bf8f9..a6d8bdd 100644 --- a/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/sink/CamelSinkSQLITCase.java +++ b/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/sink/CamelSinkSQLITCase.java @@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit; import org.apache.camel.kafkaconnector.CamelSinkTask; import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport; +import org.apache.camel.kafkaconnector.common.test.StringMessageProducer; import org.apache.camel.kafkaconnector.common.utils.TestUtils; import org.apache.camel.kafkaconnector.sql.client.DatabaseClient; import org.apache.camel.kafkaconnector.sql.services.TestDataSource; @@ -54,6 +55,28 @@ public class CamelSinkSQLITCase extends CamelSinkTestSupport { private final int expect = 1; private int received; + private static class CustomProducer extends StringMessageProducer { + public CustomProducer(String bootstrapServer, String topicName, int count) { + super(bootstrapServer, topicName, count); + } + + @Override + public String testMessageContent(int current) { + return "test"; + } + + @Override + public Map<String, String> messageHeaders(String text, int current) { + Map<String, String> sqlParameters = new HashMap<>(); + + // The prefix 'CamelHeader' is removed by the SinkTask + sqlParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "TestName", "SomeName" + TestUtils.randomWithRange(0, 100)); + sqlParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "TestData", "test data " + current); + + return sqlParameters; + } + } + public CamelSinkSQLITCase() { JdbcDatabaseContainer<?> container = new PostgreSQLContainer<>("postgres:9.6.2") .withDatabaseName("camel") @@ -80,21 +103,6 @@ public class CamelSinkSQLITCase extends CamelSinkTestSupport { client = new DatabaseClient(sqlService.jdbcUrl()); } - @Override - protected String testMessageContent(int current) { - return "test"; - } - - @Override - protected Map<String, String> messageHeaders(String text, int current) { - Map<String, String> sqlParameters = new HashMap<>(); - - // The prefix 'CamelHeader' is removed by the SinkTask - sqlParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "TestName", "SomeName" + TestUtils.randomWithRange(0, 100)); - sqlParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "TestData", "test data " + current); - - return sqlParameters; - } @Override protected void consumeMessages(CountDownLatch latch) { @@ -149,6 +157,6 @@ public class CamelSinkSQLITCase extends CamelSinkTestSupport { .withQuery("insert into test(test_name, test_data) values(:#TestName,:#TestData)") .withTopics(topicName); - runTest(factory, topicName, expect); + runTest(factory, new CustomProducer(getKafkaService().getBootstrapServers(), topicName, expect)); } } diff --git a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java index 1c71719..d0535d4 100644 --- a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java +++ b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java @@ -17,12 +17,12 @@ package org.apache.camel.kafkaconnector.ssh.sink; -import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport; +import org.apache.camel.kafkaconnector.common.test.StringMessageProducer; import org.apache.camel.kafkaconnector.common.utils.TestUtils; import org.apache.camel.kafkaconnector.ssh.services.SshService; import org.apache.camel.kafkaconnector.ssh.services.SshServiceFactory; @@ -47,6 +47,17 @@ public class CamelSinkSshITCase extends CamelSinkTestSupport { private final int expect = 3; private String topic; + private static class CustomProducer extends StringMessageProducer { + public CustomProducer(String bootstrapServer, String topicName, int count) { + super(bootstrapServer, topicName, count); + } + + @Override + public String testMessageContent(int current) { + return "date"; + } + } + @Override protected String[] getConnectorsInTest() { return new String[] {"camel-ssh-kafka-connector"}; @@ -57,15 +68,7 @@ public class CamelSinkSshITCase extends CamelSinkTestSupport { topic = TestUtils.getDefaultTestTopic(this.getClass()); } - @Override - protected String testMessageContent(int current) { - return "date"; - } - @Override - protected Map<String, String> messageHeaders(String text, int current) { - return null; - } @Override protected void consumeMessages(CountDownLatch latch) { @@ -90,6 +93,6 @@ public class CamelSinkSshITCase extends CamelSinkTestSupport { .withUsername("root") .withPassword("root"); - runTest(connectorPropertyFactory, topic, expect); + runTest(connectorPropertyFactory, new CustomProducer(getKafkaService().getBootstrapServers(), topic, expect)); } } diff --git a/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/sink/CamelSinkSyslogITCase.java b/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/sink/CamelSinkSyslogITCase.java index 1b9f942..78eb2f4 100644 --- a/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/sink/CamelSinkSyslogITCase.java +++ b/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/sink/CamelSinkSyslogITCase.java @@ -16,12 +16,12 @@ */ package org.apache.camel.kafkaconnector.syslog.sink; -import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport; +import org.apache.camel.kafkaconnector.common.test.StringMessageProducer; import org.apache.camel.kafkaconnector.common.utils.NetworkUtils; import org.apache.camel.kafkaconnector.syslog.services.SyslogService; import org.junit.jupiter.api.BeforeEach; @@ -49,6 +49,17 @@ public class CamelSinkSyslogITCase extends CamelSinkTestSupport { private String topicName; private final int expect = 1; + private static class CustomProducer extends StringMessageProducer { + public CustomProducer(String bootstrapServer, String topicName, int count) { + super(bootstrapServer, topicName, count); + } + + @Override + public String testMessageContent(int current) { + return TEST_TXT; + } + } + @Override protected String[] getConnectorsInTest() { return new String[] {"camel-syslog-kafka-connector"}; @@ -59,15 +70,6 @@ public class CamelSinkSyslogITCase extends CamelSinkTestSupport { topicName = getTopicForTest(this); } - @Override - protected String testMessageContent(int current) { - return TEST_TXT; - } - - @Override - protected Map<String, String> messageHeaders(String text, int current) { - return null; - } @Override protected void consumeMessages(CountDownLatch latch) { @@ -94,6 +96,6 @@ public class CamelSinkSyslogITCase extends CamelSinkTestSupport { .withPort(FREE_PORT) .withProtocol("udp"); - runTest(connectorPropertyFactory, topicName, expect); + runTest(connectorPropertyFactory, new CustomProducer(getKafkaService().getBootstrapServers(), topicName, expect)); } }