This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit 206b51e8bfd9b71c35b523841e1867c386ca856f Author: Andrea Cosentino <anco...@gmail.com> AuthorDate: Tue Dec 1 11:29:47 2020 +0100 Added Camel-SQL Kafka Connector integration tests --- tests/itests-sql/pom.xml | 60 +++++++++ .../kafkaconnector/sql/client/DatabaseClient.java | 70 ++++++++++ .../sql/services/SQLLocalContainerService.java | 61 +++++++++ .../sql/services/SQLRemoteService.java | 41 ++++++ .../kafkaconnector/sql/services/SQLService.java | 47 +++++++ .../sql/services/SQLServiceFactory.java | 44 ++++++ .../sql/services/TestDataSource.java | 41 ++++++ .../sql/sink/CamelSinkSQLITCase.java | 149 +++++++++++++++++++++ .../sql/sink/CamelSqlPropertyFactory.java | 44 ++++++ .../sql/source/CamelSourceSQLITCase.java | 93 +++++++++++++ .../sql/source/CamelSqlPropertyFactory.java | 44 ++++++ tests/itests-sql/src/test/resources/schema.sql | 24 ++++ 12 files changed, 718 insertions(+) diff --git a/tests/itests-sql/pom.xml b/tests/itests-sql/pom.xml new file mode 100644 index 0000000..dbcf4af --- /dev/null +++ b/tests/itests-sql/pom.xml @@ -0,0 +1,60 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <groupId>org.apache.camel.kafkaconnector</groupId> + <artifactId>itests-parent</artifactId> + <version>0.7.0-SNAPSHOT</version> + <relativePath>../itests-parent/pom.xml</relativePath> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>itests-sql</artifactId> + <name>Camel-Kafka-Connector :: Tests :: SQL</name> + + <dependencies> + <dependency> + <groupId>org.apache.camel.kafkaconnector</groupId> + <artifactId>itests-common</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-sql</artifactId> + </dependency> + + + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>postgresql</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.postgresql</groupId> + <artifactId>postgresql</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + + +</project> diff --git a/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/client/DatabaseClient.java b/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/client/DatabaseClient.java new file mode 100644 index 0000000..f3e8c21 --- /dev/null +++ b/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/client/DatabaseClient.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.sql.client; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.function.Consumer; + +import org.postgresql.ds.PGSimpleDataSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DatabaseClient { + private static final Logger LOG = LoggerFactory.getLogger(DatabaseClient.class); + private PGSimpleDataSource datasource; + private final Connection connection; + + public DatabaseClient(String url) throws SQLException { + LOG.info("Opening a new database connection using the URL {}", url); + + datasource = new PGSimpleDataSource(); + datasource.setURL(url); + datasource.setUser("ckc"); + datasource.setPassword("ckcDevel123"); + connection = datasource.getConnection(); + } + + public void runQuery(String query, Consumer<ResultSet> consumer) throws SQLException { + try (ResultSet rs = connection.prepareStatement(query).executeQuery()) { + + while (rs.next()) { + consumer.accept(rs); + } + } + } + + public int count(String table) throws SQLException { + String query = String.format("select count(*) as count from %s", table); + + try (ResultSet rs = connection.prepareStatement(query).executeQuery()) { + while (rs.next()) { + return rs.getInt("count"); + } + } + + return 0; + } + + public boolean hasAtLeastRecords(String table, int expected) throws SQLException { + int count = count(table); + + return count >= expected; + } +} diff --git a/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/services/SQLLocalContainerService.java b/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/services/SQLLocalContainerService.java new file mode 100644 index 0000000..a907d32 --- /dev/null +++ b/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/services/SQLLocalContainerService.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.sql.services; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.JdbcDatabaseContainer; +import org.testcontainers.containers.PostgreSQLContainer; + +public class SQLLocalContainerService implements SQLService { + private static final Logger LOG = LoggerFactory.getLogger(SQLLocalContainerService.class); + + private static JdbcDatabaseContainer container; + + public SQLLocalContainerService() { + container = new PostgreSQLContainer() + .withDatabaseName("camel") + .withUsername("ckc") + .withPassword("ckcDevel123") + .withInitScript("schema.sql") + .withStartupTimeoutSeconds(60); + + container.start(); + + System.setProperty("sql.url", container.getJdbcUrl()); + } + + @Override + public String sqlUrl() { + return container.getJdbcUrl(); + } + + @Override + public void initialize() { + LOG.info("Database instance available via JDBC url {}", container.getJdbcUrl()); + } + + @Override + public void shutdown() { + System.err.println("Shutdown"); + LOG.info("Stopping the database instance"); + container.stop(); + } + + +} diff --git a/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/services/SQLRemoteService.java b/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/services/SQLRemoteService.java new file mode 100644 index 0000000..47fd863 --- /dev/null +++ b/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/services/SQLRemoteService.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.sql.services; + +public class SQLRemoteService implements SQLService { + private static final String CONNECTION_URL; + + static { + CONNECTION_URL = System.getProperty("sql.connection.url"); + } + + @Override + public String sqlUrl() { + return CONNECTION_URL; + } + + @Override + public void initialize() { + // NO-OP + } + + @Override + public void shutdown() { + // NO-OP + } +} diff --git a/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/services/SQLService.java b/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/services/SQLService.java new file mode 100644 index 0000000..2e389a1 --- /dev/null +++ b/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/services/SQLService.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.sql.services; + +import org.junit.jupiter.api.extension.AfterAllCallback; +import org.junit.jupiter.api.extension.BeforeAllCallback; +import org.junit.jupiter.api.extension.ExtensionContext; + +public interface SQLService extends BeforeAllCallback, AfterAllCallback { + /** + * Perform any initialization necessary + */ + void initialize(); + + /** + * Shuts down the service after the test has completed + */ + void shutdown(); + + String sqlUrl(); + + + @Override + default void beforeAll(ExtensionContext extensionContext) throws Exception { + initialize(); + } + + @Override + default void afterAll(ExtensionContext extensionContext) throws Exception { + shutdown(); + } +} diff --git a/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/services/SQLServiceFactory.java b/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/services/SQLServiceFactory.java new file mode 100644 index 0000000..e4268f0 --- /dev/null +++ b/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/services/SQLServiceFactory.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.sql.services; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class SQLServiceFactory { + private static final Logger LOG = LoggerFactory.getLogger(SQLServiceFactory.class); + + private SQLServiceFactory() { + } + + + public static SQLService createService() { + String instanceType = System.getProperty("sql.instance.type"); + + if (instanceType == null || instanceType.equals("local-sql-container")) { + return new SQLLocalContainerService(); + } + + if (instanceType.equals("remote")) { + return new SQLRemoteService(); + } + + LOG.error("SQL instance must be one of 'local-sql-container' or 'remote"); + throw new UnsupportedOperationException("Invalid SQL instance type: " + instanceType); + } +} diff --git a/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/services/TestDataSource.java b/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/services/TestDataSource.java new file mode 100644 index 0000000..13eee72 --- /dev/null +++ b/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/services/TestDataSource.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.sql.services; + +import org.postgresql.ds.PGSimpleDataSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TestDataSource extends PGSimpleDataSource { + private static final Logger LOG = LoggerFactory.getLogger(TestDataSource.class); + + private static final String URL; + + static { + URL = System.getProperty("sql.url"); + } + + public TestDataSource() { + super(); + setUrl(URL); + + setUser("ckc"); + setPassword("ckcDevel123"); + + } +} diff --git a/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/sink/CamelSinkSQLITCase.java b/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/sink/CamelSinkSQLITCase.java new file mode 100644 index 0000000..b61fa96 --- /dev/null +++ b/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/sink/CamelSinkSQLITCase.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.sql.sink; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.kafkaconnector.CamelSinkTask; +import org.apache.camel.kafkaconnector.common.AbstractKafkaTest; +import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; +import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient; +import org.apache.camel.kafkaconnector.common.utils.TestUtils; +import org.apache.camel.kafkaconnector.sql.client.DatabaseClient; +import org.apache.camel.kafkaconnector.sql.services.SQLService; +import org.apache.camel.kafkaconnector.sql.services.SQLServiceFactory; +import org.apache.camel.kafkaconnector.sql.services.TestDataSource; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.junit.jupiter.Testcontainers; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +@Testcontainers +public class CamelSinkSQLITCase extends AbstractKafkaTest { + private static final Logger LOG = LoggerFactory.getLogger(CamelSinkSQLITCase.class); + + @RegisterExtension + public SQLService sqlService = SQLServiceFactory.createService(); + + private final int expect = 1; + private int received; + + @Override + protected String[] getConnectorsInTest() { + return new String[] {"camel-sql-kafka-connector"}; + } + + private void putRecords(CountDownLatch latch) { + KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers()); + + try { + for (int i = 0; i < expect; i++) { + Map<String, String> sqlParameters = new HashMap<>(); + + // The prefix 'CamelHeader' is removed by the SinkTask + sqlParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "TestName", "SomeName" + TestUtils.randomWithRange(0, 100)); + sqlParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "TestData", "test data " + i); + + try { + kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), "test", sqlParameters); + } catch (ExecutionException e) { + LOG.error("Unable to produce messages: {}", e.getMessage(), e); + } catch (InterruptedException e) { + break; + } + } + } finally { + latch.countDown(); + } + } + + private void verifyData(ResultSet rs) { + try { + received++; + String testName = rs.getString("test_name"); + String testData = rs.getString("test_data"); + + assertTrue(testName.startsWith("SomeName"), String.format("Unexpected test name %s", testName)); + assertTrue(testData.startsWith("test data"), String.format("Unexpected test data %s", testData)); + + } catch (SQLException e) { + LOG.error("Unable to fetch record from result set: {}", e.getMessage(), e); + fail(String.format("Unable to fetch record from result set: %s", e.getMessage())); + } + } + + public void runTest(ConnectorPropertyFactory propertyFactory) throws ExecutionException, InterruptedException { + propertyFactory.log(); + getKafkaConnectService().initializeConnectorBlocking(propertyFactory, 1); + + CountDownLatch latch = new CountDownLatch(1); + ExecutorService service = Executors.newCachedThreadPool(); + service.submit(() -> putRecords(latch)); + + if (!latch.await(30, TimeUnit.SECONDS)) { + fail("Timed out wait for data to be added to the Kafka cluster"); + } + + LOG.debug("Waiting for indices"); + + try { + DatabaseClient client = new DatabaseClient(sqlService.sqlUrl()); + + TestUtils.waitFor(() -> { + try { + return client.hasAtLeastRecords("test", expect); + } catch (SQLException e) { + LOG.warn("Failed to read the test table: {}", e.getMessage(), e); + return false; + } + }); + + client.runQuery("select * from test", this::verifyData); + } catch (SQLException e) { + LOG.error("Unable to execute the SQL query: {}", e.getMessage(), e); + fail(e.getMessage()); + } + + assertEquals(expect, received, "Did not receive the same amount of messages sent"); + LOG.debug("Created the consumer ... About to receive messages"); + } + + @Test + public void testDBFetch() throws ExecutionException, InterruptedException { + CamelSqlPropertyFactory factory = CamelSqlPropertyFactory.basic() + .withDataSource(CamelSqlPropertyFactory.classRef(TestDataSource.class.getName())) + .withQuery("insert into test(test_name, test_data) values(:#TestName,:#TestData)") + .withTopics(TestUtils.getDefaultTestTopic(this.getClass())); + + runTest(factory); + + } +} diff --git a/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/sink/CamelSqlPropertyFactory.java b/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/sink/CamelSqlPropertyFactory.java new file mode 100644 index 0000000..f7df3fc --- /dev/null +++ b/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/sink/CamelSqlPropertyFactory.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.sql.sink; + + +import org.apache.camel.kafkaconnector.common.SinkConnectorPropertyFactory; + +public final class CamelSqlPropertyFactory extends SinkConnectorPropertyFactory<CamelSqlPropertyFactory> { + private CamelSqlPropertyFactory() { + + } + + public CamelSqlPropertyFactory withDataSource(String value) { + return setProperty("camel.component.sql.dataSource", value); + } + + public CamelSqlPropertyFactory withQuery(String value) { + return setProperty("camel.sink.path.query", value); + } + + public static CamelSqlPropertyFactory basic() { + return new CamelSqlPropertyFactory() + .withName("CamelSQLSinkConnector") + .withTasksMax(1) + .withConnectorClass("org.apache.camel.kafkaconnector.sql.CamelSqlSinkConnector") + .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter") + .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter"); + } +} diff --git a/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/source/CamelSourceSQLITCase.java b/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/source/CamelSourceSQLITCase.java new file mode 100644 index 0000000..8b877bf --- /dev/null +++ b/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/source/CamelSourceSQLITCase.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.sql.source; + +import org.apache.camel.kafkaconnector.common.AbstractKafkaTest; +import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; +import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient; +import org.apache.camel.kafkaconnector.common.utils.TestUtils; +import org.apache.camel.kafkaconnector.sql.client.DatabaseClient; +import org.apache.camel.kafkaconnector.sql.services.SQLService; +import org.apache.camel.kafkaconnector.sql.services.SQLServiceFactory; +import org.apache.camel.kafkaconnector.sql.services.TestDataSource; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.junit.jupiter.Testcontainers; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.concurrent.ExecutionException; + +@Testcontainers +public class CamelSourceSQLITCase extends AbstractKafkaTest { + private static final Logger LOG = LoggerFactory.getLogger(CamelSourceSQLITCase.class); + + @RegisterExtension + public SQLService sqlService = SQLServiceFactory.createService(); + + private final int expect = 1; + private int received; + + @Override + protected String[] getConnectorsInTest() { + return new String[] {"camel-sql-kafka-connector"}; + } + + private <T> boolean checkRecord(ConsumerRecord<String, T> record) { + + LOG.debug("Received: {}", record.value()); + received++; + + return false; + } + + public void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException { + connectorPropertyFactory.log(); + + getKafkaConnectService().initializeConnector(connectorPropertyFactory); + + LOG.debug("Creating the consumer ..."); + KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers()); + kafkaClient.consume(TestUtils.getDefaultTestTopic(this.getClass()), this::checkRecord); + LOG.debug("Created the consumer ..."); + + assertEquals(received, expect, "Didn't process the expected amount of messages"); + } + + @Timeout(10) + @Test + public void testDBFetch() throws ExecutionException, InterruptedException { + CamelSqlPropertyFactory factory = CamelSqlPropertyFactory.basic() + .withDataSource(CamelSqlPropertyFactory.classRef(TestDataSource.class.getName())) + .withQuery("select * from test") + .withTopics(TestUtils.getDefaultTestTopic(this.getClass())); + + runTest(factory); + + } +} diff --git a/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/source/CamelSqlPropertyFactory.java b/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/source/CamelSqlPropertyFactory.java new file mode 100644 index 0000000..5618cf4 --- /dev/null +++ b/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/source/CamelSqlPropertyFactory.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.sql.source; + + +import org.apache.camel.kafkaconnector.common.SinkConnectorPropertyFactory; + +public final class CamelSqlPropertyFactory extends SinkConnectorPropertyFactory<CamelSqlPropertyFactory> { + private CamelSqlPropertyFactory() { + + } + + public CamelSqlPropertyFactory withDataSource(String value) { + return setProperty("camel.component.sql.dataSource", value); + } + + public CamelSqlPropertyFactory withQuery(String value) { + return setProperty("camel.source.path.query", value); + } + + public static CamelSqlPropertyFactory basic() { + return new CamelSqlPropertyFactory() + .withName("CamelSQLSourceConnector") + .withTasksMax(1) + .withConnectorClass("org.apache.camel.kafkaconnector.sql.CamelSqlSourceConnector") + .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter") + .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter"); + } +} diff --git a/tests/itests-sql/src/test/resources/schema.sql b/tests/itests-sql/src/test/resources/schema.sql new file mode 100644 index 0000000..832f6cd --- /dev/null +++ b/tests/itests-sql/src/test/resources/schema.sql @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +CREATE TABLE test ( + test_name VARCHAR(128) NOT NULL, + test_data VARCHAR(128) NOT NULL +); + +insert into test(test_name, test_data) values('SomeName','test data');