This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push: new 54cb1e2 Added tests for JDBC sink new e385af8 Merge pull request #316 from orpiske/jdbc-tests 54cb1e2 is described below commit 54cb1e2530007fefaef5d4d6f0b55e5bab3cb5b2 Author: Otavio R. Piske <angusyo...@gmail.com> AuthorDate: Thu Mar 19 20:39:02 2020 +0100 Added tests for JDBC sink --- README.adoc | 3 +- parent/pom.xml | 13 ++ .../common/clients/kafka/KafkaClient.java | 43 ++++++ tests/itests-jdbc/pom.xml | 62 +++++++++ .../kafkaconnector/jdbc/client/DatabaseClient.java | 69 ++++++++++ .../jdbc/services/JDBCLocalContainerService.java | 61 +++++++++ .../jdbc/services/JDBCRemoteService.java | 45 +++++++ .../kafkaconnector/jdbc/services/JDBCService.java | 51 +++++++ .../jdbc/services/JDBCServiceFactory.java | 43 ++++++ .../jdbc/services/TestDataSource.java | 41 ++++++ .../jdbc/sink/CamelJDBCPropertyFactory.java | 48 +++++++ .../jdbc/sink/CamelSinkJDBCITCase.java | 150 +++++++++++++++++++++ tests/itests-jdbc/src/test/resources/schema.sql | 22 +++ tests/pom.xml | 1 + 14 files changed, 651 insertions(+), 1 deletion(-) diff --git a/README.adoc b/README.adoc index 4ce0f87..2c56074 100644 --- a/README.adoc +++ b/README.adoc @@ -57,6 +57,8 @@ for remote testing: * cassandra.instance.type ** cassandra.host ** cassandra.cql3.port +* jdbc.instance.type +** jdbc.connection.url * jms-service.instance.type ** jms.broker.address * hdfs.instance.type @@ -66,7 +68,6 @@ for remote testing: ** mongodb.host ** mongodb.port - Additionally, a few manual tests can be enabled and executed with adequate configuration on the accounts and environments used by those services. This is very specific to the nature of each of those services, therefore please consult the comments on each of those test cases for the details related to their setup. diff --git a/parent/pom.xml b/parent/pom.xml index 8f1dae3..3a0c74b 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -32,6 +32,7 @@ <version.java>1.8</version.java> <version.guava>20.0</version.guava> <version.javax.annotation-api>1.3.2</version.javax.annotation-api> + <version.postgres>42.2.14</version.postgres> <version.maven.compiler>3.8.1</version.maven.compiler> <version.maven.javadoc>3.1.1</version.maven.javadoc> @@ -301,6 +302,18 @@ <scope>test</scope> </dependency> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>postgresql</artifactId> + <version>${testcontainers-version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.postgresql</groupId> + <artifactId>postgresql</artifactId> + <version>${version.postgres}</version> + <scope>test</scope> + </dependency> </dependencies> </dependencyManagement> <build> diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/clients/kafka/KafkaClient.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/clients/kafka/KafkaClient.java index feaf4e2..e9846ed 100644 --- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/clients/kafka/KafkaClient.java +++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/clients/kafka/KafkaClient.java @@ -20,6 +20,7 @@ package org.apache.camel.kafkaconnector.common.clients.kafka; import java.time.Duration; import java.util.Arrays; import java.util.Collections; +import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -32,6 +33,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.header.Header; /** * A very simple test message consumer that can consume messages of different types @@ -45,6 +47,26 @@ public class KafkaClient<K, V> { private KafkaProducer<K, V> producer; private KafkaConsumer<K, V> consumer; + private static class TestHeader implements Header { + private final String key; + private final String value; + + public TestHeader(String key, String value) { + this.key = key; + this.value = value; + } + + @Override + public String key() { + return this.key; + } + + @Override + public byte[] value() { + return value.getBytes(); + } + } + /** * Constructs the properties using the given bootstrap server @@ -98,6 +120,27 @@ public class KafkaClient<K, V> { future.get(); } + + /** + * Sends data to a topic + * + * @param topic the topic to send data to + * @param message the message to send + * @throws ExecutionException + * @throws InterruptedException + */ + public void produce(String topic, V message, Map<String, String> headers) throws ExecutionException, InterruptedException { + ProducerRecord<K, V> record = new ProducerRecord<>(topic, message); + + for (Map.Entry<String, String> entry : headers.entrySet()) { + record.headers().add(new TestHeader(entry.getKey(), entry.getValue())); + } + + Future<RecordMetadata> future = producer.send(record); + + future.get(); + } + /** * Delete a topic * diff --git a/tests/itests-jdbc/pom.xml b/tests/itests-jdbc/pom.xml new file mode 100644 index 0000000..960ced5 --- /dev/null +++ b/tests/itests-jdbc/pom.xml @@ -0,0 +1,62 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <groupId>org.apache.camel.kafkaconnector</groupId> + <artifactId>itests-parent</artifactId> + <version>0.4.0-SNAPSHOT</version> + <relativePath>../itests-parent/pom.xml</relativePath> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>itests-jdbc</artifactId> + <name>Camel-Kafka-Connector :: Tests :: JDBC</name> + + <dependencies> + <dependency> + <groupId>org.apache.camel.kafkaconnector</groupId> + <artifactId>itests-common</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-jdbc</artifactId> + </dependency> + + + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>postgresql</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.postgresql</groupId> + <artifactId>postgresql</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + + +</project> \ No newline at end of file diff --git a/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/client/DatabaseClient.java b/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/client/DatabaseClient.java new file mode 100644 index 0000000..6f2f452 --- /dev/null +++ b/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/client/DatabaseClient.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.jdbc.client; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.function.Consumer; + +import org.postgresql.ds.PGSimpleDataSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DatabaseClient { + private static final Logger LOG = LoggerFactory.getLogger(DatabaseClient.class); + private PGSimpleDataSource datasource; + private final Connection connection; + + public DatabaseClient(String url) throws SQLException { + LOG.info("Opening a new database connection using the URL {}", url); + + datasource = new PGSimpleDataSource(); + datasource.setURL(url); + datasource.setUser("ckc"); + datasource.setPassword("ckcDevel123"); + connection = datasource.getConnection(); + } + + public void runQuery(String query, Consumer<ResultSet> consumer) throws SQLException { + ResultSet rs = connection.prepareStatement(query).executeQuery(); + + while (rs.next()) { + consumer.accept(rs); + } + } + + public int count(String table) throws SQLException { + String query = String.format("select count(*) as count from %s", table); + + ResultSet rs = connection.prepareStatement(query).executeQuery(); + + while (rs.next()) { + return rs.getInt("count"); + } + + return 0; + } + + public boolean hasAtLeastRecords(String table, int expected) throws SQLException { + int count = count(table); + + return count >= expected; + } +} diff --git a/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/services/JDBCLocalContainerService.java b/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/services/JDBCLocalContainerService.java new file mode 100644 index 0000000..e67c72c --- /dev/null +++ b/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/services/JDBCLocalContainerService.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.jdbc.services; + +import java.sql.SQLException; + +import org.apache.camel.kafkaconnector.jdbc.client.DatabaseClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.JdbcDatabaseContainer; +import org.testcontainers.containers.PostgreSQLContainer; + +public class JDBCLocalContainerService implements JDBCService { + private static final Logger LOG = LoggerFactory.getLogger(JDBCLocalContainerService.class); + + private static JdbcDatabaseContainer container; + + public JDBCLocalContainerService() { + container = new PostgreSQLContainer() + .withDatabaseName("camel") + .withUsername("ckc") + .withPassword("ckcDevel123") + .withInitScript("schema.sql") + .withStartupTimeoutSeconds(60); + + container.start(); + + System.setProperty("jdbc.url", container.getJdbcUrl()); + } + + @Override + public DatabaseClient getClient() throws SQLException { + return new DatabaseClient(container.getJdbcUrl()); + } + + @Override + public void initialize() { + LOG.info("Database instance available via JDBC url {}", container.getJdbcUrl()); + } + + @Override + public void shutdown() { + LOG.info("Stopping the database instance"); + container.stop(); + } +} diff --git a/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/services/JDBCRemoteService.java b/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/services/JDBCRemoteService.java new file mode 100644 index 0000000..12956da --- /dev/null +++ b/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/services/JDBCRemoteService.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.jdbc.services; + +import java.sql.SQLException; + +import org.apache.camel.kafkaconnector.jdbc.client.DatabaseClient; + +public class JDBCRemoteService implements JDBCService { + private static final String CONNECTION_URL; + + static { + CONNECTION_URL = System.getProperty("jdbc.connection.url"); + } + + @Override + public void initialize() { + // NO-OP + } + + @Override + public void shutdown() { + // NO-OP + } + + @Override + public DatabaseClient getClient() throws SQLException { + return new DatabaseClient(CONNECTION_URL); + } +} diff --git a/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/services/JDBCService.java b/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/services/JDBCService.java new file mode 100644 index 0000000..c970453 --- /dev/null +++ b/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/services/JDBCService.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.jdbc.services; + +import java.sql.SQLException; + +import org.apache.camel.kafkaconnector.jdbc.client.DatabaseClient; +import org.junit.jupiter.api.extension.AfterAllCallback; +import org.junit.jupiter.api.extension.BeforeAllCallback; +import org.junit.jupiter.api.extension.ExtensionContext; + +public interface JDBCService extends BeforeAllCallback, AfterAllCallback { + /** + * Perform any initialization necessary + */ + void initialize(); + + /** + * Shuts down the service after the test has completed + */ + void shutdown(); + + + DatabaseClient getClient() throws SQLException; + + + @Override + default void beforeAll(ExtensionContext extensionContext) throws Exception { + initialize(); + } + + @Override + default void afterAll(ExtensionContext extensionContext) throws Exception { + shutdown(); + } +} diff --git a/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/services/JDBCServiceFactory.java b/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/services/JDBCServiceFactory.java new file mode 100644 index 0000000..ff7ef02 --- /dev/null +++ b/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/services/JDBCServiceFactory.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.jdbc.services; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class JDBCServiceFactory { + private static final Logger LOG = LoggerFactory.getLogger(JDBCServiceFactory.class); + + private JDBCServiceFactory() { + } + + public static JDBCService createService() { + String instanceType = System.getProperty("jdbc.instance.type"); + + if (instanceType == null || instanceType.equals("local-jdbc-container")) { + return new JDBCLocalContainerService(); + } + + if (instanceType.equals("remote")) { + return new JDBCRemoteService(); + } + + LOG.error("JDBC instance must be one of 'local-jdbc-container' or 'remote"); + throw new UnsupportedOperationException("Invalid JDBC instance type: " + instanceType); + } +} diff --git a/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/services/TestDataSource.java b/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/services/TestDataSource.java new file mode 100644 index 0000000..114bd67 --- /dev/null +++ b/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/services/TestDataSource.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.jdbc.services; + +import org.postgresql.ds.PGSimpleDataSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TestDataSource extends PGSimpleDataSource { + private static final Logger LOG = LoggerFactory.getLogger(TestDataSource.class); + + private static final String URL; + + static { + URL = System.getProperty("jdbc.url"); + } + + public TestDataSource() { + super(); + setUrl(URL); + + setUser("ckc"); + setPassword("ckcDevel123"); + + } +} diff --git a/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/sink/CamelJDBCPropertyFactory.java b/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/sink/CamelJDBCPropertyFactory.java new file mode 100644 index 0000000..15e4c73 --- /dev/null +++ b/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/sink/CamelJDBCPropertyFactory.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.jdbc.sink; + + +import org.apache.camel.kafkaconnector.common.SinkConnectorPropertyFactory; + +public final class CamelJDBCPropertyFactory extends SinkConnectorPropertyFactory<CamelJDBCPropertyFactory> { + private CamelJDBCPropertyFactory() { + + } + + public CamelJDBCPropertyFactory withDataSource(String value) { + return setProperty("camel.component.jdbc.dataSource", value); + } + + public CamelJDBCPropertyFactory withDataSourceName(String value) { + return setProperty("camel.sink.path.dataSourceName", value); + } + + public CamelJDBCPropertyFactory withUseHeaderAsParameters(boolean value) { + return setProperty("camel.sink.endpoint.useHeadersAsParameters", value); + } + + public static CamelJDBCPropertyFactory basic() { + return new CamelJDBCPropertyFactory() + .withName("CamelJDBCSinkConnector") + .withTasksMax(1) + .withConnectorClass("org.apache.camel.kafkaconnector.jdbc.CamelJdbcSinkConnector") + .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter") + .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter"); + } +} diff --git a/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/sink/CamelSinkJDBCITCase.java b/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/sink/CamelSinkJDBCITCase.java new file mode 100644 index 0000000..9830274 --- /dev/null +++ b/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/sink/CamelSinkJDBCITCase.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.jdbc.sink; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.kafkaconnector.common.AbstractKafkaTest; +import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; +import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient; +import org.apache.camel.kafkaconnector.common.utils.TestUtils; +import org.apache.camel.kafkaconnector.jdbc.client.DatabaseClient; +import org.apache.camel.kafkaconnector.jdbc.services.JDBCService; +import org.apache.camel.kafkaconnector.jdbc.services.JDBCServiceFactory; +import org.apache.camel.kafkaconnector.jdbc.services.TestDataSource; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.junit.jupiter.Testcontainers; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +@Testcontainers +public class CamelSinkJDBCITCase extends AbstractKafkaTest { + private static final Logger LOG = LoggerFactory.getLogger(CamelSinkJDBCITCase.class); + + @RegisterExtension + public JDBCService jdbcService = JDBCServiceFactory.createService(); + + private final int expect = 10; + private int received; + + @Override + protected String[] getConnectorsInTest() { + return new String[] {"camel-jdbc-kafka-connector"}; + } + + private void putRecords(CountDownLatch latch) { + KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers()); + String body = "insert into test(test_name, test_data) values(:?TestName, :?TestData)"; + + try { + for (int i = 0; i < expect; i++) { + Map<String, String> jdbcParameters = new HashMap<>(); + + // The prefix 'CamelHeader' is removed by the SinkTask + jdbcParameters.put("CamelHeaderTestName", "SomeName" + TestUtils.randomWithRange(0, 100)); + jdbcParameters.put("CamelHeaderTestData", "test data " + i); + + try { + kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), body, jdbcParameters); + } catch (ExecutionException e) { + LOG.error("Unable to produce messages: {}", e.getMessage(), e); + } catch (InterruptedException e) { + break; + } + } + } finally { + latch.countDown(); + } + } + + private void verifyData(ResultSet rs) { + try { + received++; + String testName = rs.getString("test_name"); + String testData = rs.getString("test_data"); + + assertTrue(testName.startsWith("SomeName"), String.format("Unexpected test name %s", testName)); + assertTrue(testData.startsWith("test data"), String.format("Unexpected test data %s", testData)); + + } catch (SQLException e) { + LOG.error("Unable to fetch record from result set: {}", e.getMessage(), e); + fail(String.format("Unable to fetch record from result set: %s", e.getMessage())); + } + } + + public void runTest(ConnectorPropertyFactory propertyFactory) throws ExecutionException, InterruptedException { + propertyFactory.log(); + getKafkaConnectService().initializeConnectorBlocking(propertyFactory, 1); + + CountDownLatch latch = new CountDownLatch(1); + ExecutorService service = Executors.newCachedThreadPool(); + service.submit(() -> putRecords(latch)); + + if (!latch.await(30, TimeUnit.SECONDS)) { + fail("Timed out wait for data to be added to the Kafka cluster"); + } + + LOG.debug("Waiting for indices"); + + try { + DatabaseClient client = jdbcService.getClient(); + + TestUtils.waitFor(() -> { + try { + return client.hasAtLeastRecords("test", expect); + } catch (SQLException e) { + LOG.warn("Failed to read the test table: {}", e.getMessage(), e); + return false; + } + }); + + client.runQuery("select * from test", this::verifyData); + } catch (SQLException e) { + LOG.error("Unable to execute the SQL query: {}", e.getMessage(), e); + fail(e.getMessage()); + } + + assertEquals(expect, received, "Did not receive the same amount of messages sent"); + LOG.debug("Created the consumer ... About to receive messages"); + } + + @Test + public void testDBFetch() throws ExecutionException, InterruptedException { + CamelJDBCPropertyFactory factory = CamelJDBCPropertyFactory.basic() + .withDataSource(CamelJDBCPropertyFactory.classRef(TestDataSource.class.getName())) + .withDataSourceName("someName") + .withUseHeaderAsParameters(true) + .withTopics(TestUtils.getDefaultTestTopic(this.getClass())); + + runTest(factory); + + } +} diff --git a/tests/itests-jdbc/src/test/resources/schema.sql b/tests/itests-jdbc/src/test/resources/schema.sql new file mode 100644 index 0000000..f94398f --- /dev/null +++ b/tests/itests-jdbc/src/test/resources/schema.sql @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +CREATE TABLE test ( + test_name VARCHAR(128) NOT NULL, + test_data VARCHAR(128) NOT NULL +); \ No newline at end of file diff --git a/tests/pom.xml b/tests/pom.xml index f9ed354..90faefa 100644 --- a/tests/pom.xml +++ b/tests/pom.xml @@ -53,6 +53,7 @@ <module>itests-salesforce</module> <module>itests-hdfs</module> <module>itests-mongodb</module> + <module>itests-jdbc</module> </modules>