This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector-examples.git
The following commit(s) were added to refs/heads/master by this push: new cef55e5 Camel-CassandraQL source example: Fixed by reflecting the change in the 3.5.0 camel-cassandraql component cef55e5 is described below commit cef55e5570168527b28fe27a492fbfbff657a084 Author: Andrea Cosentino <anco...@gmail.com> AuthorDate: Thu Oct 22 18:42:30 2020 +0200 Camel-CassandraQL source example: Fixed by reflecting the change in the 3.5.0 camel-cassandraql component --- cql/cql-source/README.adoc | 106 ++++++++++++++++++++- .../CamelCassandraQLSourceConnector.properties | 1 + 2 files changed, 103 insertions(+), 4 deletions(-) diff --git a/cql/cql-source/README.adoc b/cql/cql-source/README.adoc index 8c874e4..76362d0 100644 --- a/cql/cql-source/README.adoc +++ b/cql/cql-source/README.adoc @@ -24,14 +24,112 @@ Open the `$KAFKA_HOME/config/connect-standalone.properties` and set the `plugin.path` property to your choosen location -In this example we'll use `/home/oscerd/connectors/` +You'll need to build your connector starting from an archetype: + +``` +> mvn archetype:generate -DarchetypeGroupId=org.apache.camel.kafkaconnector.archetypes -DarchetypeArtifactId=camel-kafka-connector-extensible-archetype -DarchetypeVersion=0.5.0 +[INFO] Scanning for projects... +[INFO] +[INFO] ------------------< org.apache.maven:standalone-pom >------------------- +[INFO] Building Maven Stub Project (No POM) 1 +[INFO] --------------------------------[ pom ]--------------------------------- +[INFO] +[INFO] >>> maven-archetype-plugin:3.1.2:generate (default-cli) > generate-sources @ standalone-pom >>> +[INFO] +[INFO] <<< maven-archetype-plugin:3.1.2:generate (default-cli) < generate-sources @ standalone-pom <<< +[INFO] +[INFO] +[INFO] --- maven-archetype-plugin:3.1.2:generate (default-cli) @ standalone-pom --- +[INFO] Generating project in Interactive mode +Define value for property 'groupId': org.apache.camel.kafkaconnector.cql.extended +Define value for property 'artifactId': cql-extended +Define value for property 'version' 1.0-SNAPSHOT: : 0.5.0 +Define value for property 'package' org.apache.camel.kafkaconnector.cql.extended: : +Define value for property 'camel-kafka-connector-name': camel-cql-kafka-connector +[INFO] Using property: camel-kafka-connector-version = 0.5.0 +Confirm properties configuration: +groupId: org.apache.camel.kafkaconnector.cql.extended +artifactId: cql-extended +version: 0.5.0 +package: org.apache.camel.kafkaconnector.cql.extended +camel-kafka-connector-name: camel-cql-kafka-connector +camel-kafka-connector-version: 0.5.0 + Y: : Y +[INFO] ---------------------------------------------------------------------------- +[INFO] Using following parameters for creating project from Archetype: camel-kafka-connector-extensible-archetype:0.5.0 +[INFO] ---------------------------------------------------------------------------- +[INFO] Parameter: groupId, Value: org.apache.camel.kafkaconnector.cql.extended +[INFO] Parameter: artifactId, Value: cql-extended +[INFO] Parameter: version, Value: 0.5.0 +[INFO] Parameter: package, Value: org.apache.camel.kafkaconnector.cql.extended +[INFO] Parameter: packageInPathFormat, Value: org/apache/camel/kafkaconnector/cql/extended +[INFO] Parameter: package, Value: org.apache.camel.kafkaconnector.cql.extended +[INFO] Parameter: version, Value: 0.5.0 +[INFO] Parameter: groupId, Value: org.apache.camel.kafkaconnector.cql.extended +[INFO] Parameter: camel-kafka-connector-name, Value: camel-cql-kafka-connector +[INFO] Parameter: camel-kafka-connector-version, Value: 0.5.0 +[INFO] Parameter: artifactId, Value: cql-extended +[INFO] Project created from Archetype in dir: /home/oscerd/playground/cql-extended +[INFO] ------------------------------------------------------------------------ +[INFO] BUILD SUCCESS +[INFO] ------------------------------------------------------------------------ +[INFO] Total time: 55.314 s +[INFO] Finished at: 2020-10-22T18:06:34+02:00 +[INFO] ------------------------------------------------------------------------ +> cd /home/oscerd/playground/cql-extended +``` + +Import the cql-extended project in your favorite IDE and create the following class as RowConversionStrategy + +``` +package org.apache.camel.kafkaconnector.cql.extended; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import com.datastax.oss.driver.api.core.cql.ResultSet; +import com.datastax.oss.driver.api.core.cql.Row; +import org.apache.camel.component.cassandra.ResultSetConversionStrategy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RowConversionStrategy implements ResultSetConversionStrategy { + private static final Logger LOG = LoggerFactory.getLogger(RowConversionStrategy.class); + + @Override + public Object getBody(ResultSet resultSet) { + List<String> ret = new ArrayList<>(); + + Iterator<Row> iterator = resultSet.iterator(); + while (iterator.hasNext()) { + Row row = iterator.next(); + int id = row.getInt("id"); + String name = row.getString("name"); + ret.add("Row[" + String.valueOf(id) + ", " + name +"]"); + } + + return ret; + } +} +``` + +Now we need to build the connector: + +``` +> mvn clean package +``` + +In this example we'll use `/home/oscerd/connectors/` as plugin.path, but we'll need the generated zip from the previous build ``` > cd /home/oscerd/connectors/ -> wget https://repo1.maven.org/maven2/org/apache/camel/kafkaconnector/camel-cql-kafka-connector/0.5.0/camel-cql-kafka-connector-0.5.0-package.zip -> unzip camel-cql-kafka-connector-0.5.0-package.zip +> cp /home/oscerd/playground/cql-extended/target/cql-extended-0.5.0-package.zip . +> unzip cql-extended-0.5.0-package.zip ``` +and we're now ready to setting up the Cassandra cluster. + ## Setting up Apache Cassandra This examples require a running Cassandra instance, for simplicity the steps below show how to start Cassandra using Docker. First you'll need to run a Cassandra instance: @@ -95,6 +193,7 @@ camel.source.path.hosts=172.17.0.2 camel.source.path.port=9042 camel.source.path.keyspace=test camel.source.endpoint.cql=select * from users +camel.source.endpoint.resultSetConversionStrategy=#class:org.apache.camel.kafkaconnector.cql.extended.RowConversionStrategy ``` Now you can run the example @@ -109,5 +208,4 @@ On a different terminal run the kafka-consumer and you should see messages to Ka bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mytopic --from-beginning [Row[1, oscerd]] ``` -You can verify the behavior through the following command diff --git a/cql/cql-source/config/CamelCassandraQLSourceConnector.properties b/cql/cql-source/config/CamelCassandraQLSourceConnector.properties index c56e032..691ed12 100644 --- a/cql/cql-source/config/CamelCassandraQLSourceConnector.properties +++ b/cql/cql-source/config/CamelCassandraQLSourceConnector.properties @@ -27,5 +27,6 @@ camel.source.path.port=9042 camel.source.path.keyspace=test camel.source.endpoint.cql=select * from users camel.source.endpoint.delay=10000 +camel.source.endpoint.resultSetConversionStrategy=#class:org.apache.camel.kafkaconnector.cql.extended.RowConversionStrategy