This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit 18c5417e6dd055dea47f705339dafc6c0a1ffc57 Author: Otavio Rodolfo Piske <opi...@redhat.com> AuthorDate: Wed Feb 3 15:38:01 2021 +0100 Convert the SQL tests to the new reusable sink test base class --- .../sql/sink/CamelSinkSQLITCase.java | 118 ++++++++++----------- 1 file changed, 55 insertions(+), 63 deletions(-) diff --git a/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/sink/CamelSinkSQLITCase.java b/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/sink/CamelSinkSQLITCase.java index 53fcca7..79bf8f9 100644 --- a/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/sink/CamelSinkSQLITCase.java +++ b/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/sink/CamelSinkSQLITCase.java @@ -22,20 +22,16 @@ import java.sql.SQLException; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.apache.camel.kafkaconnector.CamelSinkTask; -import org.apache.camel.kafkaconnector.common.AbstractKafkaTest; -import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; -import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient; +import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport; import org.apache.camel.kafkaconnector.common.utils.TestUtils; import org.apache.camel.kafkaconnector.sql.client.DatabaseClient; import org.apache.camel.kafkaconnector.sql.services.TestDataSource; import org.apache.camel.test.infra.jdbc.services.JDBCService; import org.apache.camel.test.infra.jdbc.services.JDBCServiceBuilder; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import org.slf4j.Logger; @@ -47,12 +43,14 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; -public class CamelSinkSQLITCase extends AbstractKafkaTest { +public class CamelSinkSQLITCase extends CamelSinkTestSupport { private static final Logger LOG = LoggerFactory.getLogger(CamelSinkSQLITCase.class); @RegisterExtension public JDBCService sqlService; + private DatabaseClient client; + private String topicName; private final int expect = 1; private int received; @@ -76,30 +74,58 @@ public class CamelSinkSQLITCase extends AbstractKafkaTest { return new String[] {"camel-sql-kafka-connector"}; } - private void putRecords(CountDownLatch latch) { - KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers()); + @BeforeEach + public void setUp() throws SQLException { + topicName = getTopicForTest(this); + client = new DatabaseClient(sqlService.jdbcUrl()); + } - try { - for (int i = 0; i < expect; i++) { - Map<String, String> sqlParameters = new HashMap<>(); + @Override + protected String testMessageContent(int current) { + return "test"; + } + + @Override + protected Map<String, String> messageHeaders(String text, int current) { + Map<String, String> sqlParameters = new HashMap<>(); - // The prefix 'CamelHeader' is removed by the SinkTask - sqlParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "TestName", "SomeName" + TestUtils.randomWithRange(0, 100)); - sqlParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "TestData", "test data " + i); + // The prefix 'CamelHeader' is removed by the SinkTask + sqlParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "TestName", "SomeName" + TestUtils.randomWithRange(0, 100)); + sqlParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "TestData", "test data " + current); + return sqlParameters; + } + + @Override + protected void consumeMessages(CountDownLatch latch) { + try { + TestUtils.waitFor(() -> { try { - kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), "test", sqlParameters); - } catch (ExecutionException e) { - LOG.error("Unable to produce messages: {}", e.getMessage(), e); - } catch (InterruptedException e) { - break; + return client.hasAtLeastRecords("test", expect); + } catch (SQLException e) { + LOG.warn("Failed to read the test table: {}", e.getMessage(), e); + return false; } - } + }); } finally { latch.countDown(); } } + @Override + protected void verifyMessages(CountDownLatch latch) throws InterruptedException { + if (latch.await(25, TimeUnit.SECONDS)) { + try { + client.runQuery("select * from test", this::verifyData); + assertEquals(expect, received, "Did not receive as much data as expected"); + } catch (SQLException e) { + fail(e.getMessage()); + } + } else { + fail("Failed to receive the messages within the specified time"); + } + } + private void verifyData(ResultSet rs) { try { received++; @@ -115,48 +141,14 @@ public class CamelSinkSQLITCase extends AbstractKafkaTest { } } - public void runTest(ConnectorPropertyFactory propertyFactory) throws ExecutionException, InterruptedException { - propertyFactory.log(); - getKafkaConnectService().initializeConnectorBlocking(propertyFactory, 1); - - CountDownLatch latch = new CountDownLatch(1); - ExecutorService service = Executors.newCachedThreadPool(); - service.submit(() -> putRecords(latch)); - - if (!latch.await(30, TimeUnit.SECONDS)) { - fail("Timed out wait for data to be added to the Kafka cluster"); - } - - LOG.debug("Waiting for indices"); - - try { - DatabaseClient client = new DatabaseClient(sqlService.jdbcUrl()); - - TestUtils.waitFor(() -> { - try { - return client.hasAtLeastRecords("test", expect); - } catch (SQLException e) { - LOG.warn("Failed to read the test table: {}", e.getMessage(), e); - return false; - } - }); - - client.runQuery("select * from test", this::verifyData); - } catch (SQLException e) { - LOG.error("Unable to execute the SQL query: {}", e.getMessage(), e); - fail(e.getMessage()); - } - - assertEquals(expect, received, "Did not receive the same amount of messages sent"); - LOG.debug("Created the consumer ... About to receive messages"); - } - @Test - public void testDBFetch() throws ExecutionException, InterruptedException { - CamelSqlPropertyFactory factory = CamelSqlPropertyFactory.basic().withDataSource(CamelSqlPropertyFactory.classRef(TestDataSource.class.getName())) - .withQuery("insert into test(test_name, test_data) values(:#TestName,:#TestData)").withTopics(TestUtils.getDefaultTestTopic(this.getClass())); - - runTest(factory); - + public void testDBFetch() throws Exception { + CamelSqlPropertyFactory factory = CamelSqlPropertyFactory + .basic() + .withDataSource(CamelSqlPropertyFactory.classRef(TestDataSource.class.getName())) + .withQuery("insert into test(test_name, test_data) values(:#TestName,:#TestData)") + .withTopics(topicName); + + runTest(factory, topicName, expect); } }