This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push: new d5527c5 Added a JDBC test case that does not use a custom test data source d5527c5 is described below commit d5527c52163911005fad185c031f1ad115677497 Author: Otavio Rodolfo Piske <opi...@redhat.com> AuthorDate: Tue Jan 5 10:18:39 2021 +0100 Added a JDBC test case that does not use a custom test data source --- .../common/BasicConnectorPropertyFactory.java | 4 + .../common/ComponentConfigBuilder.java | 44 ++++++ .../jdbc/sink/CamelSinkJDBCNoDataSourceITCase.java | 172 +++++++++++++++++++++ 3 files changed, 220 insertions(+) diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/BasicConnectorPropertyFactory.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/BasicConnectorPropertyFactory.java index d5dd4f6..cf409e5 100644 --- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/BasicConnectorPropertyFactory.java +++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/BasicConnectorPropertyFactory.java @@ -67,6 +67,10 @@ public abstract class BasicConnectorPropertyFactory<T extends BasicConnectorProp return new TransformsConfigBuilder<>((T) this, getProperties(), name); } + public ComponentConfigBuilder<T> withComponentConfig(String name, String value) { + return new ComponentConfigBuilder<>((T) this, getProperties(), name, value); + } + protected T setProperty(String name, Object value) { connectorProps.put(name, value); diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/ComponentConfigBuilder.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/ComponentConfigBuilder.java new file mode 100644 index 0000000..949910e --- /dev/null +++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/ComponentConfigBuilder.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.common; + +import java.util.Properties; + +public class ComponentConfigBuilder<T extends ConnectorPropertyFactory> { + private final T handle; + private final Properties properties; + private final String name; + + public ComponentConfigBuilder(T handle, Properties properties, String name, String value) { + this.handle = handle; + this.properties = properties; + this.name = name; + + properties.put(name, value); + } + + public ComponentConfigBuilder<T> withEntry(String key, String value) { + properties.put(name + "." + key, value); + + return this; + } + + public T end() { + return handle; + } +} diff --git a/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/sink/CamelSinkJDBCNoDataSourceITCase.java b/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/sink/CamelSinkJDBCNoDataSourceITCase.java new file mode 100644 index 0000000..880433e --- /dev/null +++ b/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/sink/CamelSinkJDBCNoDataSourceITCase.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.jdbc.sink; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.kafkaconnector.CamelSinkTask; +import org.apache.camel.kafkaconnector.common.AbstractKafkaTest; +import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; +import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient; +import org.apache.camel.kafkaconnector.common.utils.TestUtils; +import org.apache.camel.kafkaconnector.jdbc.client.DatabaseClient; +import org.apache.camel.test.infra.jdbc.services.JDBCService; +import org.apache.camel.test.infra.jdbc.services.JDBCServiceBuilder; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.JdbcDatabaseContainer; +import org.testcontainers.containers.PostgreSQLContainer; +import org.testcontainers.junit.jupiter.Testcontainers; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +@Testcontainers +public class CamelSinkJDBCNoDataSourceITCase extends AbstractKafkaTest { + @RegisterExtension + static JDBCService jdbcService; + + private static final Logger LOG = LoggerFactory.getLogger(CamelSinkJDBCNoDataSourceITCase.class); + + private final int expect = 10; + private int received; + + static { + final String postgresImage = "postgres:9.6.2"; + + JdbcDatabaseContainer container = new PostgreSQLContainer(postgresImage) + .withDatabaseName("camel") + .withUsername("ckc") + .withPassword("ckcDevel123") + .withInitScript("schema.sql") + .withStartupTimeoutSeconds(60); + + // Let the JDBC Service handle container lifecycle + jdbcService = JDBCServiceBuilder.newBuilder() + .withContainer(container) + .build(); + } + + @Override + protected String[] getConnectorsInTest() { + return new String[] {"camel-jdbc-kafka-connector"}; + } + + private void putRecords(CountDownLatch latch) { + KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers()); + String body = "insert into test(test_name, test_data) values(:?TestName, :?TestData)"; + + try { + for (int i = 0; i < expect; i++) { + Map<String, String> jdbcParameters = new HashMap<>(); + + // The prefix 'CamelHeader' is removed by the SinkTask + jdbcParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "TestName", "SomeName" + TestUtils.randomWithRange(0, 100)); + jdbcParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "TestData", "test data " + i); + + try { + kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), body, jdbcParameters); + } catch (ExecutionException e) { + LOG.error("Unable to produce messages: {}", e.getMessage(), e); + } catch (InterruptedException e) { + break; + } + } + } finally { + latch.countDown(); + } + } + + private void verifyData(ResultSet rs) { + try { + received++; + String testName = rs.getString("test_name"); + String testData = rs.getString("test_data"); + + assertTrue(testName.startsWith("SomeName"), String.format("Unexpected test name %s", testName)); + assertTrue(testData.startsWith("test data"), String.format("Unexpected test data %s", testData)); + + } catch (SQLException e) { + LOG.error("Unable to fetch record from result set: {}", e.getMessage(), e); + fail(String.format("Unable to fetch record from result set: %s", e.getMessage())); + } + } + + public void runTest(ConnectorPropertyFactory propertyFactory) throws ExecutionException, InterruptedException { + propertyFactory.log(); + getKafkaConnectService().initializeConnectorBlocking(propertyFactory, 1); + + CountDownLatch latch = new CountDownLatch(1); + ExecutorService service = Executors.newCachedThreadPool(); + service.submit(() -> putRecords(latch)); + + if (!latch.await(30, TimeUnit.SECONDS)) { + fail("Timed out wait for data to be added to the Kafka cluster"); + } + + LOG.debug("Waiting for indices"); + + try { + DatabaseClient client = new DatabaseClient(jdbcService.jdbcUrl()); + + TestUtils.waitFor(() -> { + try { + return client.hasAtLeastRecords("test", expect); + } catch (SQLException e) { + LOG.warn("Failed to read the test table: {}", e.getMessage(), e); + return false; + } + }); + + client.runQuery("select * from test", this::verifyData); + } catch (SQLException e) { + LOG.error("Unable to execute the SQL query: {}", e.getMessage(), e); + fail(e.getMessage()); + } + + assertEquals(expect, received, "Did not receive the same amount of messages sent"); + LOG.debug("Created the consumer ... About to receive messages"); + } + + @Test + public void testDBFetch() throws ExecutionException, InterruptedException { + CamelJDBCPropertyFactory factory = CamelJDBCPropertyFactory.basic() + .withComponentConfig("camel.component.jdbc.dataSource", "#class:org.postgresql.ds.PGSimpleDataSource") + .withEntry("user", "ckc") + .withEntry("password", "ckcDevel123") + .withEntry("url", jdbcService.jdbcUrl()) + .end() + .withDataSourceName("anotherName") + .withUseHeaderAsParameters(true) + .withTopics(TestUtils.getDefaultTestTopic(this.getClass())); + + runTest(factory); + + } +}