This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch infinispan in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector-examples.git
commit 67f8cf9e9abbae77d951f598c119b2de63616b0c Author: Andrea Cosentino <anco...@gmail.com> AuthorDate: Thu Oct 1 08:30:58 2020 +0200 Added an Infinispan Sink Example --- infinispan/infinispan-sink/README.adoc | 195 +++++++++++++++++++++ .../config/CamelInfinispanSinkConnector.properties | 26 +++ 2 files changed, 221 insertions(+) diff --git a/infinispan/infinispan-sink/README.adoc b/infinispan/infinispan-sink/README.adoc new file mode 100644 index 0000000..afc6899 --- /dev/null +++ b/infinispan/infinispan-sink/README.adoc @@ -0,0 +1,195 @@ +# Camel-Kafka-connector Infinispan Sink + +This is an example for Camel-Kafka-connector Infinispan Sink + +## Standalone + +### What is needed + +- An Infinispan instance + +### Setting up Infinispan + +As first step you need to download the Infinispan Server with version 11.0.3.Final. + +Infinispan 11.x is secured by default. For the purpose of this example we'll remove the security check, so we won't need any authentication. + +If you have Infinispan unzipped in $INFINISPAN_HOME, you'll need to: + +- Edit $INFINISPAN_HOME/server/config/infinispan.xml + +and the file content should be + +``` +<infinispan + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="urn:infinispan:config:11.0 https://infinispan.org/schemas/infinispan-config-11.0.xsd + urn:infinispan:server:11.0 https://infinispan.org/schemas/infinispan-server-11.0.xsd" + xmlns="urn:infinispan:config:11.0" + xmlns:server="urn:infinispan:server:11.0"> + + <cache-container name="default" statistics="true"> + <transport cluster="${infinispan.cluster.name}" stack="${infinispan.cluster.stack:tcp}" node-name="${infinispan.node.name:}"/> + </cache-container> + + <server xmlns="urn:infinispan:server:11.0"> + <interfaces> + <interface name="public"> + <inet-address value="${infinispan.bind.address:127.0.0.1}"/> + </interface> + </interfaces> + + <socket-bindings default-interface="public" port-offset="${infinispan.socket.binding.port-offset:0}"> + <socket-binding name="default" port="${infinispan.bind.port:11222}"/> + <socket-binding name="memcached" port="11221"/> + </socket-bindings> + + <endpoints socket-binding="default"> + <hotrod-connector name="hotrod"/> + <rest-connector name="rest"/> + </endpoints> + </server> +</infinispan> +``` + +Now we need to create a cache, since the default cache is not available anymore out of the box with Infinispan 11. + +You can now start your server + +``` +> $INFINISPAN_HOME/bin/server.sh +bin/server.sh +06:57:51,378 INFO (main) [BOOT] JVM OpenJDK 64-Bit Server VM AdoptOpenJDK 25.252-b09 +06:57:51,395 INFO (main) [BOOT] JVM arguments = [-Xms64m, -Xmx512m, -XX:MetaspaceSize=64M, -Djava.net.preferIPv4Stack=true, -Djava.awt.headless=true, -Dvisualvm.display.name=infinispan-server, -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager, -Dinfinispan.server.home.path=/home/oscerd/playground/infinispan-server-11.0.3.Final] +06:57:51,396 INFO (main) [BOOT] PID = 9678 +06:57:51,441 INFO (main) [org.infinispan.SERVER] ISPN080000: Infinispan Server starting +06:57:51,441 INFO (main) [org.infinispan.SERVER] ISPN080017: Server configuration: /home/oscerd/playground/infinispan-server-11.0.3.Final/server/conf/infinispan.xml +06:57:51,441 INFO (main) [org.infinispan.SERVER] ISPN080032: Logging configuration: /home/oscerd/playground/infinispan-server-11.0.3.Final/server/conf/log4j2.xml +06:57:51,959 INFO (main) [org.infinispan.SERVER] ISPN080027: Loaded extension 'query-dsl-filter-converter-factory' +06:57:51,960 INFO (main) [org.infinispan.SERVER] ISPN080027: Loaded extension 'continuous-query-filter-converter-factory' +06:57:51,961 INFO (main) [org.infinispan.SERVER] ISPN080027: Loaded extension 'iteration-filter-converter-factory' +06:57:51,961 INFO (main) [org.infinispan.SERVER] ISPN080027: Loaded extension 'jdk.nashorn.api.scripting.NashornScriptEngineFactory' +06:57:52,367 WARN (main) [org.infinispan.PERSISTENCE] ISPN000554: jboss-marshalling is deprecated and planned for removal +06:57:52,919 INFO (main) [org.infinispan.CONTAINER] ISPN000128: Infinispan version: Infinispan 'Corona Extra' 11.0.3.Final +06:57:52,921 INFO (main) [org.infinispan.CONTAINER] ISPN000389: Loaded global state, version=11.0.3.Final timestamp=2020-09-30T21:04:46.511Z +06:57:53,046 INFO (main) [org.infinispan.CLUSTER] ISPN000078: Starting JGroups channel cluster with stack tcp +06:57:55,138 INFO (main) [org.jgroups.protocols.pbcast.GMS] ghost-35169: no members discovered after 2001 ms: creating cluster as coordinator +06:57:55,150 INFO (main) [org.infinispan.CLUSTER] ISPN000094: Received new cluster view for channel cluster: [ghost-35169|0] (1) [ghost-35169] +06:57:55,156 INFO (main) [org.infinispan.CLUSTER] ISPN000079: Channel cluster local address is ghost-35169, physical addresses are [192.168.1.15:7800] +06:57:55,810 INFO (main) [org.infinispan.CONTAINER] ISPN000104: Using EmbeddedTransactionManager +``` + +So, you'll need to run + +``` +> cd $INFINISPAN_HOME/bin/cli.sh +[disconnected]> connect +[ghost-35169@cluster//containers/default]> create cache --template=org.infinispan.DIST_SYNC mycache +[ghost-35169@cluster//containers/default]> describe caches/mycache +{ + "distributed-cache" : { + "mode" : "SYNC", + "remote-timeout" : 17500, + "state-transfer" : { + "timeout" : 60000 + }, + "transaction" : { + "mode" : "NONE" + }, + "locking" : { + "concurrency-level" : 1000, + "acquire-timeout" : 15000, + "striping" : false + }, + "statistics" : true + } +} +``` +Now we should be ready to work on this. + +### Running Kafka + +``` +$KAFKA_HOME/bin/zookeeper-server-start.sh config/zookeeper.properties +$KAFKA_HOME/bin/kafka-server-start.sh config/server.properties +$KAFKA_HOME/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic mytopic +``` + +## Setting up the needed bits and running the example + +You'll need to setup the plugin.path property in your kafka + +Open the `$KAFKA_HOME/config/connect-standalone.properties` + +and set the `plugin.path` property to your choosen location + +In this example we'll use `/home/oscerd/connectors/` + +``` +> cd /home/oscerd/connectors/ +> wget https://repo1.maven.org/maven2/org/apache/camel/kafkaconnector/camel-infinispan-kafka-connector/0.5.0/camel-infinispan-kafka-connector-0.5.0-package.zip +> unzip camel-infinispan-kafka-connector-0.5.0-package.zip +``` + +Now it's time to setup the connectors + +Open the AWS2 SNS configuration file + +``` +name=CamelInfinispanSinkConnector +connector.class=org.apache.camel.kafkaconnector.infinispan.CamelInfinispanSinkConnector +key.converter=org.apache.kafka.connect.storage.StringConverter +value.converter=org.apache.kafka.connect.storage.StringConverter + +topics=mytopic + +camel.sink.endpoint.hosts=localhost +camel.sink.path.cacheName=mycache +``` + +Now you can run the example + +``` +$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties config/CamelInfinispanSinkConnector.properties +``` + +On a different terminal run the kafka-producer and send messages to your Kafka Broker. + +``` +bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic mytopic +Kafka to Infinispan message 1 +Kafka to Infinispan message 2 +``` + +You should see the stats of cache changing. You can check this by running + +``` +> $INFINISPAN_HOME/bin/cli.sh +[disconnected]> connect +[ghost-35169@cluster//containers/default]> cache mycache +[ghost-35169@cluster//containers/default/caches/mycache]> stats +{ + "total_number_of_entries" : 0, + "off_heap_memory_used" : 0, + "time_since_start" : 350, + "time_since_reset" : 350, + "stores" : 0, + "current_number_of_entries" : 0, + "data_memory_used" : 0, + "misses" : 0, + "remove_hits" : 0, + "remove_misses" : 0, + "evictions" : 0, + "average_read_time" : 0, + "average_read_time_nanos" : 0, + "average_write_time" : 0, + "average_write_time_nanos" : 0, + "average_remove_time" : 0, + "average_remove_time_nanos" : 0, + "required_minimum_number_of_nodes" : 1, + "current_number_of_entries_in_memory" : 0, + "retrievals" : 0, + "hits" : 0 +} +``` + diff --git a/infinispan/infinispan-sink/config/CamelInfinispanSinkConnector.properties b/infinispan/infinispan-sink/config/CamelInfinispanSinkConnector.properties new file mode 100644 index 0000000..260c7ed --- /dev/null +++ b/infinispan/infinispan-sink/config/CamelInfinispanSinkConnector.properties @@ -0,0 +1,26 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +name=CamelInfinispanSinkConnector +connector.class=org.apache.camel.kafkaconnector.infinispan.CamelInfinispanSinkConnector +key.converter=org.apache.kafka.connect.storage.StringConverter +value.converter=org.apache.kafka.connect.storage.StringConverter + +topics=mytopic + +camel.sink.endpoint.hosts=localhost +camel.sink.path.cacheName=mycache