This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 711d6c1 Avoid blocking connector initialization on SSH tests as it
leads to failures on GH actions
711d6c1 is described below
commit 711d6c17f38e74d3cd96fd7beb2336fd6910ec03
Author: Otavio Rodolfo Piske <[email protected]>
AuthorDate: Fri Feb 12 16:08:21 2021 +0100
Avoid blocking connector initialization on SSH tests as it leads to
failures on GH actions
---
.../common/test/CamelSinkTestSupport.java | 25 ++++++++++++++++++++++
.../ssh/sink/CamelSinkSshITCase.java | 2 +-
2 files changed, 26 insertions(+), 1 deletion(-)
diff --git
a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSinkTestSupport.java
b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSinkTestSupport.java
index b414726..ec9e9dc 100644
---
a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSinkTestSupport.java
+++
b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSinkTestSupport.java
@@ -97,6 +97,31 @@ public abstract class CamelSinkTestSupport extends
AbstractKafkaTest {
verifyMessages(latch);
}
+ /**
+ * A simple test runner that follows the steps: initialize, start
consumer, produce messages, verify results
+ *
+ * @param connectorPropertyFactory A factory for connector properties
+ * @throws Exception For test-specific exceptions
+ */
+ protected void runTestNonBlocking(ConnectorPropertyFactory
connectorPropertyFactory, TestMessageProducer producer) throws Exception {
+ connectorPropertyFactory.log();
+ getKafkaConnectService().initializeConnector(connectorPropertyFactory);
+
+ LOG.debug("Creating the consumer ...");
+ ExecutorService service = Executors.newCachedThreadPool();
+
+ CountDownLatch latch = new CountDownLatch(1);
+ service.submit(() -> consumeMessages(latch));
+
+ producer.produceMessages();
+
+ LOG.debug("Waiting for the messages to be processed");
+ service.shutdown();
+
+ LOG.debug("Waiting for the test to complete");
+ verifyMessages(latch);
+ }
+
protected boolean waitForData() {
try {
diff --git
a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java
b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java
index 02f6f21..abfdccb 100644
---
a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java
+++
b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java
@@ -94,6 +94,6 @@ public class CamelSinkSshITCase extends CamelSinkTestSupport {
.withUsername("root")
.withPassword("root");
- runTest(connectorPropertyFactory, new
CustomProducer(getKafkaService().getBootstrapServers(), topic, expect));
+ runTestNonBlocking(connectorPropertyFactory, new
CustomProducer(getKafkaService().getBootstrapServers(), topic, expect));
}
}