This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch camel-master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit 114dfe5990237f06042755a717d01354d6f47d0f Author: Otavio Rodolfo Piske <opi...@redhat.com> AuthorDate: Wed Feb 3 13:43:45 2021 +0100 Convert the JDBC tests to the new reusable sink test base class --- .../jdbc/sink/CamelSinkJDBCITCase.java | 121 ++++++++++----------- 1 file changed, 58 insertions(+), 63 deletions(-) diff --git a/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/sink/CamelSinkJDBCITCase.java b/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/sink/CamelSinkJDBCITCase.java index 87752a1..3663890 100644 --- a/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/sink/CamelSinkJDBCITCase.java +++ b/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/sink/CamelSinkJDBCITCase.java @@ -22,22 +22,19 @@ import java.sql.SQLException; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.apache.camel.kafkaconnector.CamelSinkTask; -import org.apache.camel.kafkaconnector.common.AbstractKafkaTest; -import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; -import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient; +import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport; import org.apache.camel.kafkaconnector.common.utils.TestUtils; import org.apache.camel.kafkaconnector.jdbc.client.DatabaseClient; import org.apache.camel.kafkaconnector.jdbc.services.TestDataSource; import org.apache.camel.test.infra.jdbc.services.JDBCService; import org.apache.camel.test.infra.jdbc.services.JDBCServiceBuilder; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.extension.RegisterExtension; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,11 +46,13 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; @TestInstance(TestInstance.Lifecycle.PER_CLASS) -public class CamelSinkJDBCITCase extends AbstractKafkaTest { +public class CamelSinkJDBCITCase extends CamelSinkTestSupport { @RegisterExtension static JDBCService jdbcService; private static final Logger LOG = LoggerFactory.getLogger(CamelSinkJDBCITCase.class); + private DatabaseClient client; + private String topicName; private final int expect = 10; private int received; @@ -74,36 +73,69 @@ public class CamelSinkJDBCITCase extends AbstractKafkaTest { .build(); } + @BeforeEach + public void setUp() throws SQLException { + topicName = getTopicForTest(this); + client = new DatabaseClient(jdbcService.jdbcUrl()); + received = 0; + } + @Override protected String[] getConnectorsInTest() { return new String[] {"camel-jdbc-kafka-connector"}; } - private void putRecords(CountDownLatch latch) { - KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers()); - String body = "insert into test(test_name, test_data) values(:?TestName, :?TestData)"; + @Override + protected String testMessageContent(int current) { + return "insert into test(test_name, test_data) values(:?TestName, :?TestData)"; + } - try { - for (int i = 0; i < expect; i++) { - Map<String, String> jdbcParameters = new HashMap<>(); + @Override + protected Map<String, String> messageHeaders(String text, int current) { + Map<String, String> jdbcParameters = new HashMap<>(); - // The prefix 'CamelHeader' is removed by the SinkTask - jdbcParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "TestName", "SomeName" + TestUtils.randomWithRange(0, 100)); - jdbcParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "TestData", "test data " + i); + // The prefix 'CamelHeader' is removed by the SinkTask + jdbcParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "TestName", "SomeName" + TestUtils.randomWithRange(0, 100)); + jdbcParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "TestData", "test data " + current); + return jdbcParameters; + } + + @Override + protected void consumeMessages(CountDownLatch latch) { + try { + LOG.debug("Waiting for indices"); + + TestUtils.waitFor(() -> { try { - kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), body, jdbcParameters); - } catch (ExecutionException e) { - LOG.error("Unable to produce messages: {}", e.getMessage(), e); - } catch (InterruptedException e) { - break; + return client.hasAtLeastRecords("test", expect); + } catch (SQLException e) { + LOG.warn("Failed to read the test table: {}", e.getMessage(), e); + return false; } - } + }); + } finally { latch.countDown(); } } + @Override + protected void verifyMessages(CountDownLatch latch) throws InterruptedException { + if (latch.await(20, TimeUnit.SECONDS)) { + try { + client.runQuery("select * from test", this::verifyData); + + assertEquals(expect, received, "Did not receive the same amount of messages sent"); + } catch (SQLException e) { + fail(e.getMessage()); + } + } else { + fail(String.format("Failed to receive the messages within the specified time: received %d of %d", + received, expect)); + } + } + private void verifyData(ResultSet rs) { try { received++; @@ -112,58 +144,21 @@ public class CamelSinkJDBCITCase extends AbstractKafkaTest { assertTrue(testName.startsWith("SomeName"), String.format("Unexpected test name %s", testName)); assertTrue(testData.startsWith("test data"), String.format("Unexpected test data %s", testData)); - } catch (SQLException e) { LOG.error("Unable to fetch record from result set: {}", e.getMessage(), e); fail(String.format("Unable to fetch record from result set: %s", e.getMessage())); } } - public void runTest(ConnectorPropertyFactory propertyFactory) throws ExecutionException, InterruptedException { - propertyFactory.log(); - getKafkaConnectService().initializeConnectorBlocking(propertyFactory, 1); - - CountDownLatch latch = new CountDownLatch(1); - ExecutorService service = Executors.newCachedThreadPool(); - service.submit(() -> putRecords(latch)); - - if (!latch.await(30, TimeUnit.SECONDS)) { - fail("Timed out wait for data to be added to the Kafka cluster"); - } - - LOG.debug("Waiting for indices"); - - try { - DatabaseClient client = new DatabaseClient(jdbcService.jdbcUrl()); - - TestUtils.waitFor(() -> { - try { - return client.hasAtLeastRecords("test", expect); - } catch (SQLException e) { - LOG.warn("Failed to read the test table: {}", e.getMessage(), e); - return false; - } - }); - - client.runQuery("select * from test", this::verifyData); - } catch (SQLException e) { - LOG.error("Unable to execute the SQL query: {}", e.getMessage(), e); - fail(e.getMessage()); - } - - assertEquals(expect, received, "Did not receive the same amount of messages sent"); - LOG.debug("Created the consumer ... About to receive messages"); - } - + @Timeout(30) @Test - public void testDBFetch() throws ExecutionException, InterruptedException { + public void testDBFetch() throws Exception { CamelJDBCPropertyFactory factory = CamelJDBCPropertyFactory.basic() .withDataSource(CamelJDBCPropertyFactory.classRef(TestDataSource.class.getName())) .withDataSourceName("someName") .withUseHeaderAsParameters(true) - .withTopics(TestUtils.getDefaultTestTopic(this.getClass())); - - runTest(factory); + .withTopics(topicName); + runTest(factory, topicName, expect); } }