This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push: new 7cee66a Defer topic deletion to the extensions to prevent an Exception on shutdown new 188c727 Merge pull request #374 from orpiske/topic-cleanup 7cee66a is described below commit 7cee66ad2a492edb6fd14ad3ae13b5ae8df70b36 Author: Otavio Rodolfo Piske <opi...@redhat.com> AuthorDate: Wed Aug 12 17:30:18 2020 +0200 Defer topic deletion to the extensions to prevent an Exception on shutdown --- .../aws/v1/kinesis/source/CamelSourceAWSKinesisITCase.java | 2 -- .../aws/v1/s3/source/CamelSourceAWSS3ITCase.java | 2 -- .../aws/v1/sns/sink/CamelSinkAWSSNSITCase.java | 6 ------ .../aws/v1/sqs/sink/CamelSinkAWSSQSITCase.java | 1 - .../aws/v1/sqs/source/CamelSourceAWSSQSITCase.java | 2 -- .../aws/v2/kinesis/source/CamelSourceAWSKinesisITCase.java | 2 -- .../cassandra/sink/CamelSinkCassandraITCase.java | 2 -- .../cassandra/source/CamelSourceCassandraITCase.java | 2 -- .../camel/kafkaconnector/common/AbstractKafkaTest.java | 14 -------------- .../common/services/kafkaconnect/KafkaConnectEmbedded.java | 6 ++++++ .../common/services/kafkaconnect/KafkaConnectRunner.java | 8 ++++++++ .../elasticsearch/sink/CamelSinkElasticSearchITCase.java | 6 ------ .../kafkaconnector/file/sink/CamelSinkFileITCase.java | 1 - .../kafkaconnector/http/sink/CamelSinkHTTPITCase.java | 2 -- .../salesforce/sink/CamelSinkSalesforceITCase.java | 2 -- .../salesforce/source/CamelSourceSalesforceITCase.java | 2 -- .../kafkaconnector/sjms2/sink/CamelSinkJMSITCase.java | 6 ------ .../kafkaconnector/sjms2/source/CamelSourceJMSITCase.java | 6 ------ .../kafkaconnector/slack/sink/CamelSinkSlackITCase.java | 6 ------ .../kafkaconnector/syslog/sink/CamelSinkSyslogITCase.java | 6 ------ .../syslog/source/CamelSourceSyslogITCase.java | 8 -------- .../timer/source/CamelSourceTimerITCase.java | 7 ------- 22 files changed, 14 insertions(+), 85 deletions(-) diff --git a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/kinesis/source/CamelSourceAWSKinesisITCase.java b/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/kinesis/source/CamelSourceAWSKinesisITCase.java index 3df34f1..4413b2d 100644 --- a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/kinesis/source/CamelSourceAWSKinesisITCase.java +++ b/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/kinesis/source/CamelSourceAWSKinesisITCase.java @@ -137,8 +137,6 @@ public class CamelSourceAWSKinesisITCase extends AbstractKafkaTest { deleteStream(); awsKinesisClient.shutdown(); - - deleteKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())); } diff --git a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/s3/source/CamelSourceAWSS3ITCase.java b/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/s3/source/CamelSourceAWSS3ITCase.java index 4930870..965803f 100644 --- a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/s3/source/CamelSourceAWSS3ITCase.java +++ b/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/s3/source/CamelSourceAWSS3ITCase.java @@ -82,8 +82,6 @@ public class CamelSourceAWSS3ITCase extends AbstractKafkaTest { } catch (Exception e) { LOG.warn("Unable to delete bucked: {}", e.getMessage(), e); } - - deleteKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())); } private boolean checkRecord(ConsumerRecord<String, String> record) { diff --git a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sns/sink/CamelSinkAWSSNSITCase.java b/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sns/sink/CamelSinkAWSSNSITCase.java index 93b07a5..1199ddb 100644 --- a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sns/sink/CamelSinkAWSSNSITCase.java +++ b/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sns/sink/CamelSinkAWSSNSITCase.java @@ -36,7 +36,6 @@ import org.apache.camel.kafkaconnector.common.AbstractKafkaTest; import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient; import org.apache.camel.kafkaconnector.common.utils.TestUtils; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; @@ -80,11 +79,6 @@ public class CamelSinkAWSSNSITCase extends AbstractKafkaTest { received = 0; } - @AfterEach - public void tearDown() { - deleteKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())); - } - private boolean checkMessages(List<Message> messages) { for (Message message : messages) { LOG.info("Received: {}", message.getBody()); diff --git a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sqs/sink/CamelSinkAWSSQSITCase.java b/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sqs/sink/CamelSinkAWSSQSITCase.java index 0125bd3..db2dd6d 100644 --- a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sqs/sink/CamelSinkAWSSQSITCase.java +++ b/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sqs/sink/CamelSinkAWSSQSITCase.java @@ -82,7 +82,6 @@ public class CamelSinkAWSSQSITCase extends AbstractKafkaTest { @AfterEach public void tearDown() { - deleteKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())); if (!awssqsClient.deleteQueue(queueName)) { fail("Failed to delete queue"); } diff --git a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sqs/source/CamelSourceAWSSQSITCase.java b/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sqs/source/CamelSourceAWSSQSITCase.java index cbef4be..2c26d09 100644 --- a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sqs/source/CamelSourceAWSSQSITCase.java +++ b/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sqs/source/CamelSourceAWSSQSITCase.java @@ -74,8 +74,6 @@ public class CamelSourceAWSSQSITCase extends AbstractKafkaTest { @AfterEach public void tearDown() { - deleteKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())); - if (!awssqsClient.deleteQueue(queueName)) { fail("Failed to delete queue"); } diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/source/CamelSourceAWSKinesisITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/source/CamelSourceAWSKinesisITCase.java index ac8dcd3..33b8792 100644 --- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/source/CamelSourceAWSKinesisITCase.java +++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/source/CamelSourceAWSKinesisITCase.java @@ -164,8 +164,6 @@ public class CamelSourceAWSKinesisITCase extends AbstractKafkaTest { @AfterEach public void tearDown() { deleteStream(); - - deleteKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())); } private boolean checkRecord(ConsumerRecord<String, String> record) { diff --git a/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/sink/CamelSinkCassandraITCase.java b/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/sink/CamelSinkCassandraITCase.java index e0374be..0cdf1b6 100644 --- a/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/sink/CamelSinkCassandraITCase.java +++ b/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/sink/CamelSinkCassandraITCase.java @@ -79,8 +79,6 @@ public class CamelSinkCassandraITCase extends AbstractKafkaTest { if (testDataDao != null) { testDataDao.dropTable(); } - - deleteKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())); } private void putRecords(CountDownLatch latch) { diff --git a/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/source/CamelSourceCassandraITCase.java b/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/source/CamelSourceCassandraITCase.java index 3e8f887..707f923 100644 --- a/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/source/CamelSourceCassandraITCase.java +++ b/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/source/CamelSourceCassandraITCase.java @@ -80,8 +80,6 @@ public class CamelSourceCassandraITCase extends AbstractKafkaTest { if (testDataDao != null) { testDataDao.dropTable(); } - - deleteKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())); } private <T> boolean checkRecord(ConsumerRecord<String, T> record) { diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/AbstractKafkaTest.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/AbstractKafkaTest.java index d1aa6c6..db9e2d2 100644 --- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/AbstractKafkaTest.java +++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/AbstractKafkaTest.java @@ -17,21 +17,16 @@ package org.apache.camel.kafkaconnector.common; -import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient; import org.apache.camel.kafkaconnector.common.services.kafka.KafkaService; import org.apache.camel.kafkaconnector.common.services.kafka.KafkaServiceFactory; import org.apache.camel.kafkaconnector.common.services.kafkaconnect.KafkaConnectRunnerFactory; import org.apache.camel.kafkaconnector.common.services.kafkaconnect.KafkaConnectService; import org.apache.camel.kafkaconnector.common.utils.PropertyUtils; import org.junit.jupiter.api.extension.RegisterExtension; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.testcontainers.junit.jupiter.Testcontainers; @Testcontainers public abstract class AbstractKafkaTest { - private static final Logger LOG = LoggerFactory.getLogger(AbstractKafkaTest.class); - @RegisterExtension public final KafkaService kafkaService; @@ -61,13 +56,4 @@ public abstract class AbstractKafkaTest { public KafkaConnectService getKafkaConnectService() { return kafkaConnectService; } - - protected void deleteKafkaTopic(String topic) { - try { - KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers()); - kafkaClient.deleteTopic(topic); - } catch (Throwable t) { - LOG.warn("Topic not deleted (probably the Kafka test cluster was already shutting down?).", t); - } - } } diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectEmbedded.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectEmbedded.java index fe5d9b5..dadab5a 100644 --- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectEmbedded.java +++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectEmbedded.java @@ -23,6 +23,7 @@ import java.util.Map; import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; import org.apache.camel.kafkaconnector.common.services.kafka.EmbeddedKafkaService; import org.apache.camel.kafkaconnector.common.services.kafka.KafkaService; +import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.connect.runtime.AbstractStatus; import org.apache.kafka.connect.runtime.ConnectorConfig; import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; @@ -81,6 +82,11 @@ public class KafkaConnectEmbedded implements KafkaConnectService { public void stop() { if (connectorName != null) { try { + LOG.info("Removing topics used during the test"); + Admin client = cluster.kafka().createAdminClient(); + + client.deleteTopics(cluster.connectorTopics(connectorName).topics()); + LOG.info("Removing connector {}", connectorName); cluster.deleteConnector(connectorName); } finally { diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectRunner.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectRunner.java index 754dbd7..121be12 100644 --- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectRunner.java +++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectRunner.java @@ -27,6 +27,7 @@ import java.util.function.BiConsumer; import java.util.function.Consumer; import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; +import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy; @@ -214,6 +215,13 @@ class KafkaConnectRunner { */ public void stop() { if (connect != null) { + LOG.info("Removing topics used during the test"); + KafkaClient kafkaClient = new KafkaClient(bootstrapServer); + + for (String connector : herder.connectors()) { + herder.connectorActiveTopics(connector).topics().forEach(t -> kafkaClient.deleteTopic(t)); + } + connect.stop(); } else { LOG.warn("Trying to stop an uninitialized Kafka Connect Runner"); diff --git a/tests/itests-elasticsearch/src/test/java/org/apache/camel/kafkaconnector/elasticsearch/sink/CamelSinkElasticSearchITCase.java b/tests/itests-elasticsearch/src/test/java/org/apache/camel/kafkaconnector/elasticsearch/sink/CamelSinkElasticSearchITCase.java index aca155e..80834dd 100644 --- a/tests/itests-elasticsearch/src/test/java/org/apache/camel/kafkaconnector/elasticsearch/sink/CamelSinkElasticSearchITCase.java +++ b/tests/itests-elasticsearch/src/test/java/org/apache/camel/kafkaconnector/elasticsearch/sink/CamelSinkElasticSearchITCase.java @@ -33,7 +33,6 @@ import org.apache.camel.kafkaconnector.elasticsearch.services.ElasticSearchServi import org.apache.camel.kafkaconnector.elasticsearch.services.ElasticSearchServiceFactory; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -72,11 +71,6 @@ public class CamelSinkElasticSearchITCase extends AbstractKafkaTest { received = 0; } - @AfterEach - public void tearDown() { - deleteKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())); - } - private void putRecords(CountDownLatch latch) { KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers()); diff --git a/tests/itests-file/src/test/java/org/apache/camel/kafkaconnector/file/sink/CamelSinkFileITCase.java b/tests/itests-file/src/test/java/org/apache/camel/kafkaconnector/file/sink/CamelSinkFileITCase.java index 3300571..adbc9fd 100644 --- a/tests/itests-file/src/test/java/org/apache/camel/kafkaconnector/file/sink/CamelSinkFileITCase.java +++ b/tests/itests-file/src/test/java/org/apache/camel/kafkaconnector/file/sink/CamelSinkFileITCase.java @@ -69,7 +69,6 @@ public class CamelSinkFileITCase extends AbstractKafkaTest { @AfterEach public void tearDown() { cleanup(); - deleteKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())); } private void cleanup() { diff --git a/tests/itests-http/src/test/java/org/apache/camel/kafkaconnector/http/sink/CamelSinkHTTPITCase.java b/tests/itests-http/src/test/java/org/apache/camel/kafkaconnector/http/sink/CamelSinkHTTPITCase.java index 7151484..f23ac4d 100644 --- a/tests/itests-http/src/test/java/org/apache/camel/kafkaconnector/http/sink/CamelSinkHTTPITCase.java +++ b/tests/itests-http/src/test/java/org/apache/camel/kafkaconnector/http/sink/CamelSinkHTTPITCase.java @@ -81,8 +81,6 @@ public class CamelSinkHTTPITCase extends AbstractKafkaTest { } finally { localServer.shutdown(2, TimeUnit.SECONDS); } - - deleteKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())); } diff --git a/tests/itests-salesforce/src/test/java/org/apache/camel/kafkaconnector/salesforce/sink/CamelSinkSalesforceITCase.java b/tests/itests-salesforce/src/test/java/org/apache/camel/kafkaconnector/salesforce/sink/CamelSinkSalesforceITCase.java index f49635d..6f05bde 100644 --- a/tests/itests-salesforce/src/test/java/org/apache/camel/kafkaconnector/salesforce/sink/CamelSinkSalesforceITCase.java +++ b/tests/itests-salesforce/src/test/java/org/apache/camel/kafkaconnector/salesforce/sink/CamelSinkSalesforceITCase.java @@ -119,8 +119,6 @@ public class CamelSinkSalesforceITCase extends AbstractKafkaTest { } accountName = null; - - deleteKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())); } diff --git a/tests/itests-salesforce/src/test/java/org/apache/camel/kafkaconnector/salesforce/source/CamelSourceSalesforceITCase.java b/tests/itests-salesforce/src/test/java/org/apache/camel/kafkaconnector/salesforce/source/CamelSourceSalesforceITCase.java index 40cb51e..d0e542a 100644 --- a/tests/itests-salesforce/src/test/java/org/apache/camel/kafkaconnector/salesforce/source/CamelSourceSalesforceITCase.java +++ b/tests/itests-salesforce/src/test/java/org/apache/camel/kafkaconnector/salesforce/source/CamelSourceSalesforceITCase.java @@ -133,8 +133,6 @@ public class CamelSourceSalesforceITCase extends AbstractKafkaTest { fail("Unable to delete the test account on Salesforce"); } account = null; - - deleteKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())); } private <T> boolean checkRecord(ConsumerRecord<String, T> record) { diff --git a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkJMSITCase.java b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkJMSITCase.java index 12d2198..0f3db92 100644 --- a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkJMSITCase.java +++ b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkJMSITCase.java @@ -35,7 +35,6 @@ import org.apache.camel.kafkaconnector.sjms2.clients.JMSClient; import org.apache.camel.kafkaconnector.sjms2.common.SJMS2Common; import org.apache.camel.kafkaconnector.sjms2.services.JMSService; import org.apache.camel.kafkaconnector.sjms2.services.JMSServiceFactory; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -71,11 +70,6 @@ public class CamelSinkJMSITCase extends AbstractKafkaTest { received = 0; } - @AfterEach - public void tearDown() { - deleteKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())); - } - private boolean checkRecord(Message jmsMessage) { if (jmsMessage instanceof TextMessage) { try { diff --git a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSITCase.java b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSITCase.java index 40896c4..1dd8cae 100644 --- a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSITCase.java +++ b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSITCase.java @@ -30,7 +30,6 @@ import org.apache.camel.kafkaconnector.sjms2.common.SJMS2Common; import org.apache.camel.kafkaconnector.sjms2.services.JMSService; import org.apache.camel.kafkaconnector.sjms2.services.JMSServiceFactory; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -67,11 +66,6 @@ public class CamelSourceJMSITCase extends AbstractKafkaTest { received = 0; } - @AfterEach - public void tearDown() { - deleteKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())); - } - private <T> boolean checkRecord(ConsumerRecord<String, T> record) { LOG.debug("Received: {}", record.value()); received++; diff --git a/tests/itests-slack/src/test/java/org/apache/camel/kafkaconnector/slack/sink/CamelSinkSlackITCase.java b/tests/itests-slack/src/test/java/org/apache/camel/kafkaconnector/slack/sink/CamelSinkSlackITCase.java index 171f3ba..4c9d7de 100644 --- a/tests/itests-slack/src/test/java/org/apache/camel/kafkaconnector/slack/sink/CamelSinkSlackITCase.java +++ b/tests/itests-slack/src/test/java/org/apache/camel/kafkaconnector/slack/sink/CamelSinkSlackITCase.java @@ -23,7 +23,6 @@ import org.apache.camel.kafkaconnector.common.AbstractKafkaTest; import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient; import org.apache.camel.kafkaconnector.common.utils.TestUtils; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.condition.EnabledIfSystemProperty; @@ -55,11 +54,6 @@ public class CamelSinkSlackITCase extends AbstractKafkaTest { return new String[]{"camel-slack-kafka-connector"}; } - @AfterEach - public void tearDown() { - deleteKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())); - } - private void runTest(ConnectorPropertyFactory connectorPropertyFactory, String message) throws ExecutionException, InterruptedException { connectorPropertyFactory.log(); getKafkaConnectService().initializeConnector(connectorPropertyFactory); diff --git a/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/sink/CamelSinkSyslogITCase.java b/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/sink/CamelSinkSyslogITCase.java index 24587b7..a6a1ff5 100644 --- a/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/sink/CamelSinkSyslogITCase.java +++ b/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/sink/CamelSinkSyslogITCase.java @@ -22,7 +22,6 @@ import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient; import org.apache.camel.kafkaconnector.common.utils.NetworkUtils; import org.apache.camel.kafkaconnector.common.utils.TestUtils; import org.apache.camel.kafkaconnector.syslog.services.SyslogService; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -61,11 +60,6 @@ public class CamelSinkSyslogITCase extends AbstractKafkaTest { received = 0; } - @AfterEach - public void tearDown() { - deleteKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())); - } - private void runBasicProduceTest(ConnectorPropertyFactory connectorPropertyFactory) throws Exception { connectorPropertyFactory.log(); getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1); diff --git a/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/source/CamelSourceSyslogITCase.java b/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/source/CamelSourceSyslogITCase.java index a54d5cd..16df742 100644 --- a/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/source/CamelSourceSyslogITCase.java +++ b/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/source/CamelSourceSyslogITCase.java @@ -17,8 +17,6 @@ package org.apache.camel.kafkaconnector.syslog.source; -import java.io.IOException; - import org.apache.camel.CamelContext; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.syslog.SyslogDataFormat; @@ -30,7 +28,6 @@ import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient; import org.apache.camel.kafkaconnector.common.utils.NetworkUtils; import org.apache.camel.kafkaconnector.common.utils.TestUtils; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -65,11 +62,6 @@ public class CamelSourceSyslogITCase extends AbstractKafkaTest { received = 0; } - @AfterEach - public void tearDown() throws IOException, InterruptedException { - deleteKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())); - } - private void produceLogMessages(String protocol, String host, String port, String message) throws Exception { CamelContext camelContext = new DefaultCamelContext(); diff --git a/tests/itests-timer/src/test/java/org/apache/camel/kafkaconnector/timer/source/CamelSourceTimerITCase.java b/tests/itests-timer/src/test/java/org/apache/camel/kafkaconnector/timer/source/CamelSourceTimerITCase.java index b9f6f2a..9f89ff1 100644 --- a/tests/itests-timer/src/test/java/org/apache/camel/kafkaconnector/timer/source/CamelSourceTimerITCase.java +++ b/tests/itests-timer/src/test/java/org/apache/camel/kafkaconnector/timer/source/CamelSourceTimerITCase.java @@ -17,7 +17,6 @@ package org.apache.camel.kafkaconnector.timer.source; -import java.io.IOException; import java.util.concurrent.ExecutionException; import org.apache.camel.kafkaconnector.common.AbstractKafkaTest; @@ -25,7 +24,6 @@ import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient; import org.apache.camel.kafkaconnector.common.utils.TestUtils; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -56,11 +54,6 @@ public class CamelSourceTimerITCase extends AbstractKafkaTest { received = 0; } - @AfterEach - public void tearDown() throws IOException, InterruptedException { - deleteKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())); - } - private boolean checkRecord(ConsumerRecord<String, String> record) { received++;