This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector-examples.git
The following commit(s) were added to refs/heads/master by this push: new 38daafb Added a JDBC sink connector example 38daafb is described below commit 38daafbb0e6888950444ceeca2e58758b19fae06 Author: Andrea Cosentino <anco...@gmail.com> AuthorDate: Tue Jan 5 12:07:51 2021 +0100 Added a JDBC sink connector example --- jdbc/jdbc-sink/README.adoc | 155 +++++++++++++++++++++ .../config/CamelJdbcSinkConnector.properties | 30 ++++ 2 files changed, 185 insertions(+) diff --git a/jdbc/jdbc-sink/README.adoc b/jdbc/jdbc-sink/README.adoc new file mode 100644 index 0000000..b92d740 --- /dev/null +++ b/jdbc/jdbc-sink/README.adoc @@ -0,0 +1,155 @@ += Camel-Kafka-connector JDBC Sink + +This is an example for Camel-Kafka-connector JDBC Sink + +== Standalone + +=== What is needed + +- A running postgresql instance through docker +- Postgresql Jdbc Driver + +=== Running Kafka + +[source] +---- +$KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties +$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties +$KAFKA_HOME/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic mytopic +---- + +=== Download the connector package + +Download the connector package zip and extract the content to a directory. In this example we'll use `/home/oscerd/connectors/` + +[source] +---- +> cd /home/oscerd/connectors/ +> wget https://repo1.maven.org/maven2/org/apache/camel/kafkaconnector/camel-jdbc-kafka-connector/0.7.0/camel-jdbc-kafka-connector-0.7.0-package.zip +> unzip camel-jdbc-kafka-connector-0.7.0-package.zip +---- + +There is also the need of the driver for this example + +[source] +---- +> cd /home/oscerd/connectors/camel-jdbc-kafka-connector/ +> wget https://repo1.maven.org/maven2/org/postgresql/postgresql/42.2.14/postgresql-42.2.14.jar +---- + +=== Configuring Kafka Connect + +You'll need to set up the `plugin.path` property in your kafka + +Open the `$KAFKA_HOME/config/connect-standalone.properties` and set the `plugin.path` property to your choosen location: + +[source] +---- +... +plugin.path=/home/oscerd/connectors +... +---- + +=== Setup the docker image + +We'll need a full running Postgresql instance. + +First step is running it: + +[source] +---- +> docker run --name some-postgres -e POSTGRES_PASSWORD=mysecretpassword -d postgres +6cd4ba4696f2e8872f3787faaa8d03d1dae5cb5f22986648adf132823f3690eb +---- + +Take note of the container id. +We need now to create the table we'll use: the table is the following + +[source] +---- +CREATE TABLE accounts ( + user_id serial PRIMARY KEY, + username VARCHAR ( 50 ) UNIQUE NOT NULL, + city VARCHAR ( 50 ) NOT NULL +); +---- + +We are now ready to create the table + +[source] +---- +> docker exec -it 6cd4ba4696f2e8872f3787faaa8d03d1dae5cb5f22986648adf132823f3690eb psql -U postgres +psql (13.0 (Debian 13.0-1.pgdg100+1)) +Type "help" for help. + +postgres=# CREATE TABLE accounts ( +postgres(# user_id serial PRIMARY KEY, +postgres(# username VARCHAR ( 50 ) UNIQUE NOT NULL, +postgres(# city VARCHAR ( 50 ) NOT NULL +postgres(# ); +---- + +We need to take note also of the container ip + +---- +> docker inspect -f '{{range.NetworkSettings.Networks}}{{.IPAddress}}{{end}}' 6cd4ba4696f2e8872f3787faaa8d03d1dae5cb5f22986648adf132823f3690eb +172.17.0.2 +---- + +=== Setup the connectors + +Open the JDBC configuration file at `$EXAMPLES/jdbc/jdbc-sink/config/CamelJdbcSinkConnector.properties` + +[source] +---- +name=CamelJdbcSinkConnector +connector.class=org.apache.camel.kafkaconnector.jdbc.CamelJdbcSinkConnector +tasks.max=1 + +key.converter=org.apache.kafka.connect.storage.StringConverter +value.converter=org.apache.kafka.connect.storage.StringConverter + +topics=mytopic + +camel.component.jdbc.dataSource.user=postgres +camel.component.jdbc.dataSource.password=mysecretpassword +camel.component.jdbc.dataSource.serverName=172.17.0.2 +camel.component.jdbc.dataSource=#class:org.postgresql.ds.PGSimpleDataSource +camel.sink.path.dataSourceName=default +camel.sink.endpoint.useHeadersAsParameters=true +---- + +and add the correct IP for the container. + +=== Running the example + +Run the kafka connect with the JDBC Sink connector: + +[source] +---- +$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties $EXAMPLES/jdbc/jdbc-sink/config/CamelJdbcSinkConnector.properties +---- + +On a different terminal run the kafkacat producer and send the following message + +[source] +---- +> echo "INSERT INTO accounts (username,city) VALUES (:?username,:?city)" | ./kafkacat -P -b localhost:9092 -t mytopic -H "CamelHeader.username=andrea" -H "CamelHeader.city=Roma" +> echo "INSERT INTO accounts (username,city) VALUES (:?username,:?city)" | ./kafkacat -P -b localhost:9092 -t mytopic -H "CamelHeader.username=John" -H "CamelHeader.city=New York" +---- + +Now you can search through the psql command the record inserted + +[source] +---- +> docker exec -it 6cd4ba4696f2e8872f3787faaa8d03d1dae5cb5f22986648adf132823f3690eb psql -U postgres +psql (13.0 (Debian 13.0-1.pgdg100+1)) +Type "help" for help. + +postgres=# select * from accounts; + user_id | username | city +---------+----------+---------- + 1 | andrea | Roma + 2 | John | New York +(2 rows) +---- diff --git a/jdbc/jdbc-sink/config/CamelJdbcSinkConnector.properties b/jdbc/jdbc-sink/config/CamelJdbcSinkConnector.properties new file mode 100644 index 0000000..06965bb --- /dev/null +++ b/jdbc/jdbc-sink/config/CamelJdbcSinkConnector.properties @@ -0,0 +1,30 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +name=CamelJdbcSinkConnector +connector.class=org.apache.camel.kafkaconnector.jdbc.CamelJdbcSinkConnector +key.converter=org.apache.kafka.connect.storage.StringConverter +value.converter=org.apache.kafka.connect.storage.StringConverter + +topics=mytopic + +camel.component.jdbc.dataSource.user=postgres +camel.component.jdbc.dataSource.password=mysecretpassword +camel.component.jdbc.dataSource.serverName=172.17.0.2 +camel.component.jdbc.dataSource=#class:org.postgresql.ds.PGSimpleDataSource +camel.sink.path.dataSourceName=default +camel.sink.endpoint.useHeadersAsParameters=true