This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch cron-example in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector-examples.git
commit 5751c017e3f564c9b0f419747c16b2dc3267ec92 Author: Andrea Cosentino <anco...@gmail.com> AuthorDate: Thu Jan 21 13:47:23 2021 +0100 Added a Cron source connector example --- cron/cron-source/README.adoc | 107 +++++++++++++++++++++ .../config/CamelCronSourceConnector.properties | 28 ++++++ 2 files changed, 135 insertions(+) diff --git a/cron/cron-source/README.adoc b/cron/cron-source/README.adoc new file mode 100644 index 0000000..fc5e6da --- /dev/null +++ b/cron/cron-source/README.adoc @@ -0,0 +1,107 @@ += Camel-Kafka-connector Cron Source + +This is an example for Camel-Kafka-connector Cron Source + +== Standalone + +=== Running Kafka + +[source] +---- +$KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties +$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties +$KAFKA_HOME/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic mytopic +---- + +=== Download the connector package + +Download the connector package zip and extract the content to a directory. In this example we'll use `/home/oscerd/connectors/` + +[source] +---- +> cd /home/oscerd/connectors/ +> wget https://repo1.maven.org/maven2/org/apache/camel/kafkaconnector/camel-cron-kafka-connector/0.7.0/camel-cron-kafka-connector-0.7.0-package.zip +> unzip camel-cron-kafka-connector-0.7.0-package.zip +---- + +In 0.7.0, you'll need to add also camel-quartz and quartz dependency. + +[source] +---- +> cd /home/oscerd/connectors/camel-cron-kafka-connector +> wget https://repo1.maven.org/maven2/org/apache/camel/camel-quartz/3.7.0/camel-quartz-3.7.0.jar +> wget https://repo1.maven.org/maven2/org/quartz-scheduler/quartz/2.3.2/quartz-2.3.2.jar +---- + +We are now ready + +=== Configuring Kafka Connect + +You'll need to set up the `plugin.path` property in your kafka + +Open the `$KAFKA_HOME/config/connect-standalone.properties` and set the `plugin.path` property to your choosen location: + +[source] +---- +... +plugin.path=/home/oscerd/connectors +... +---- + +=== Setup the connectors + +Open the Cron configuration file at `$EXAMPLES/cron/cron-source/config/CamelCronSourceConnector.properties` + +[source] +---- +name=CamelCronSourceConnector +connector.class=org.apache.camel.kafkaconnector.cron.CamelCronSourceConnector +tasks.max=1 + +key.converter=org.apache.kafka.connect.storage.StringConverter +value.converter=org.apache.kafka.connect.storage.StringConverter + +topics=cron-topic + +camel.source.endpoint.schedule=0/5+*+*+*+*+? +camel.source.path.name=cron-timer +---- + +In this case we are using local unix socket. + +For the containerId you'll need to use the value of the running debian container. + +=== Running the example + +Run the kafka connect with the Cron Source connector: + +[source] +---- +$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties $EXAMPLES/cron/cron-source/config/CamelCronSourceConnector.properties +---- + +We'll get multiple message with statistics + +On a different terminal run the kafkacat consumer + +[source] +---- +./kafkacat -b localhost:9092 -t mytopic -f 'Headers: %h: Message value: %s\n' +% Auto-selecting Consumer mode (use -P or -C to override) +Headers: CamelHeader.fireTime=2021-01-21T13:15:30.255Z,CamelHeader.jobRunTime=9,CamelHeader.nextFireTime=2021-01-21T13:15:35.000Z,CamelHeader.refireCount=0,CamelHeader.scheduledFireTime=2021-01-21T13:15:30.000Z,CamelHeader.triggerGroup=Camel_camel-1,CamelHeader.triggerName=cron-timer,CamelProperty.CamelToEndpoint=direct://end?pollingConsumerBlockTimeout=0&pollingConsumerBlockWhenFull=true&pollingConsumerQueueSize=1000: Message value: +Headers: CamelHeader.fireTime=2021-01-21T13:15:35.001Z,CamelHeader.jobRunTime=1,CamelHeader.nextFireTime=2021-01-21T13:15:40.000Z,CamelHeader.previousFireTime=2021-01-21T13:15:30.000Z,CamelHeader.refireCount=0,CamelHeader.scheduledFireTime=2021-01-21T13:15:35.000Z,CamelHeader.triggerGroup=Camel_camel-1,CamelHeader.triggerName=cron-timer,CamelProperty.CamelToEndpoint=direct://end?pollingConsumerBlockTimeout=0&pollingConsumerBlockWhenFull=true&pollingConsumerQueueSize=1000: Message value: +Headers: CamelHeader.fireTime=2021-01-21T13:15:40.002Z,CamelHeader.jobRunTime=0,CamelHeader.nextFireTime=2021-01-21T13:15:45.000Z,CamelHeader.previousFireTime=2021-01-21T13:15:35.000Z,CamelHeader.refireCount=0,CamelHeader.scheduledFireTime=2021-01-21T13:15:40.000Z,CamelHeader.triggerGroup=Camel_camel-1,CamelHeader.triggerName=cron-timer,CamelProperty.CamelToEndpoint=direct://end?pollingConsumerBlockTimeout=0&pollingConsumerBlockWhenFull=true&pollingConsumerQueueSize=1000: Message value: +Headers: CamelHeader.fireTime=2021-01-21T13:15:45.002Z,CamelHeader.jobRunTime=1,CamelHeader.nextFireTime=2021-01-21T13:15:50.000Z,CamelHeader.previousFireTime=2021-01-21T13:15:40.000Z,CamelHeader.refireCount=0,CamelHeader.scheduledFireTime=2021-01-21T13:15:45.000Z,CamelHeader.triggerGroup=Camel_camel-1,CamelHeader.triggerName=cron-timer,CamelProperty.CamelToEndpoint=direct://end?pollingConsumerBlockTimeout=0&pollingConsumerBlockWhenFull=true&pollingConsumerQueueSize=1000: Message value: +Headers: CamelHeader.fireTime=2021-01-21T13:15:50.001Z,CamelHeader.jobRunTime=1,CamelHeader.nextFireTime=2021-01-21T13:15:55.000Z,CamelHeader.previousFireTime=2021-01-21T13:15:45.000Z,CamelHeader.refireCount=0,CamelHeader.scheduledFireTime=2021-01-21T13:15:50.000Z,CamelHeader.triggerGroup=Camel_camel-1,CamelHeader.triggerName=cron-timer,CamelProperty.CamelToEndpoint=direct://end?pollingConsumerBlockTimeout=0&pollingConsumerBlockWhenFull=true&pollingConsumerQueueSize=1000: Message value: +Headers: CamelHeader.fireTime=2021-01-21T13:15:55.001Z,CamelHeader.jobRunTime=1,CamelHeader.nextFireTime=2021-01-21T13:16:00.000Z,CamelHeader.previousFireTime=2021-01-21T13:15:50.000Z,CamelHeader.refireCount=0,CamelHeader.scheduledFireTime=2021-01-21T13:15:55.000Z,CamelHeader.triggerGroup=Camel_camel-1,CamelHeader.triggerName=cron-timer,CamelProperty.CamelToEndpoint=direct://end?pollingConsumerBlockTimeout=0&pollingConsumerBlockWhenFull=true&pollingConsumerQueueSize=1000: Message value: +Headers: CamelHeader.fireTime=2021-01-21T13:16:00.002Z,CamelHeader.jobRunTime=-1,CamelHeader.nextFireTime=2021-01-21T13:16:05.000Z,CamelHeader.previousFireTime=2021-01-21T13:15:55.000Z,CamelHeader.refireCount=0,CamelHeader.scheduledFireTime=2021-01-21T13:16:00.000Z,CamelHeader.triggerGroup=Camel_camel-1,CamelHeader.triggerName=cron-timer,CamelProperty.CamelToEndpoint=direct://end?pollingConsumerBlockTimeout=0&pollingConsumerBlockWhenFull=true&pollingConsumerQueueSize=1000: Message value: +Headers: CamelHeader.fireTime=2021-01-21T13:16:05.001Z,CamelHeader.jobRunTime=-1,CamelHeader.nextFireTime=2021-01-21T13:16:10.000Z,CamelHeader.previousFireTime=2021-01-21T13:16:00.000Z,CamelHeader.refireCount=0,CamelHeader.scheduledFireTime=2021-01-21T13:16:05.000Z,CamelHeader.triggerGroup=Camel_camel-1,CamelHeader.triggerName=cron-timer,CamelProperty.CamelToEndpoint=direct://end?pollingConsumerBlockTimeout=0&pollingConsumerBlockWhenFull=true&pollingConsumerQueueSize=1000: Message value: +Headers: CamelHeader.fireTime=2021-01-21T13:16:10.002Z,CamelHeader.jobRunTime=1,CamelHeader.nextFireTime=2021-01-21T13:16:15.000Z,CamelHeader.previousFireTime=2021-01-21T13:16:05.000Z,CamelHeader.refireCount=0,CamelHeader.scheduledFireTime=2021-01-21T13:16:10.000Z,CamelHeader.triggerGroup=Camel_camel-1,CamelHeader.triggerName=cron-timer,CamelProperty.CamelToEndpoint=direct://end?pollingConsumerBlockTimeout=0&pollingConsumerBlockWhenFull=true&pollingConsumerQueueSize=1000: Message value: +Headers: CamelHeader.fireTime=2021-01-21T13:16:15.002Z,CamelHeader.jobRunTime=0,CamelHeader.nextFireTime=2021-01-21T13:16:20.000Z,CamelHeader.previousFireTime=2021-01-21T13:16:10.000Z,CamelHeader.refireCount=0,CamelHeader.scheduledFireTime=2021-01-21T13:16:15.000Z,CamelHeader.triggerGroup=Camel_camel-1,CamelHeader.triggerName=cron-timer,CamelProperty.CamelToEndpoint=direct://end?pollingConsumerBlockTimeout=0&pollingConsumerBlockWhenFull=true&pollingConsumerQueueSize=1000: Message value: +Headers: CamelHeader.fireTime=2021-01-21T13:16:20.001Z,CamelHeader.jobRunTime=1,CamelHeader.nextFireTime=2021-01-21T13:16:25.000Z,CamelHeader.previousFireTime=2021-01-21T13:16:15.000Z,CamelHeader.refireCount=0,CamelHeader.scheduledFireTime=2021-01-21T13:16:20.000Z,CamelHeader.triggerGroup=Camel_camel-1,CamelHeader.triggerName=cron-timer,CamelProperty.CamelToEndpoint=direct://end?pollingConsumerBlockTimeout=0&pollingConsumerBlockWhenFull=true&pollingConsumerQueueSize=1000: Message value: +Headers: CamelHeader.fireTime=2021-01-21T13:16:25.001Z,CamelHeader.jobRunTime=1,CamelHeader.nextFireTime=2021-01-21T13:16:30.000Z,CamelHeader.previousFireTime=2021-01-21T13:16:20.000Z,CamelHeader.refireCount=0,CamelHeader.scheduledFireTime=2021-01-21T13:16:25.000Z,CamelHeader.triggerGroup=Camel_camel-1,CamelHeader.triggerName=cron-timer,CamelProperty.CamelToEndpoint=direct://end?pollingConsumerBlockTimeout=0&pollingConsumerBlockWhenFull=true&pollingConsumerQueueSize=1000: Message value: +Headers: CamelHeader.fireTime=2021-01-21T13:16:30.001Z,CamelHeader.jobRunTime=-1,CamelHeader.nextFireTime=2021-01-21T13:16:35.000Z,CamelHeader.previousFireTime=2021-01-21T13:16:25.000Z,CamelHeader.refireCount=0,CamelHeader.scheduledFireTime=2021-01-21T13:16:30.000Z,CamelHeader.triggerGroup=Camel_camel-1,CamelHeader.triggerName=cron-timer,CamelProperty.CamelToEndpoint=direct://end?pollingConsumerBlockTimeout=0&pollingConsumerBlockWhenFull=true&pollingConsumerQueueSize=1000: Message value: +Headers: CamelHeader.fireTime=2021-01-21T13:16:35.001Z,CamelHeader.jobRunTime=0,CamelHeader.nextFireTime=2021-01-21T13:16:40.000Z,CamelHeader.previousFireTime=2021-01-21T13:16:30.000Z,CamelHeader.refireCount=0,CamelHeader.scheduledFireTime=2021-01-21T13:16:35.000Z,CamelHeader.triggerGroup=Camel_camel-1,CamelHeader.triggerName=cron-timer,CamelProperty.CamelToEndpoint=direct://end?pollingConsumerBlockTimeout=0&pollingConsumerBlockWhenFull=true&pollingConsumerQueueSize=1000: Message value: +Headers: CamelHeader.fireTime=2021-01-21T13:16:40.001Z,CamelHeader.jobRunTime=0,CamelHeader.nextFireTime=2021-01-21T13:16:45.000Z,CamelHeader.previousFireTime=2021-01-21T13:16:35.000Z,CamelHeader.refireCount=0,CamelHeader.scheduledFireTime=2021-01-21T13:16:40.000Z,CamelHeader.triggerGroup=Camel_camel-1,CamelHeader.triggerName=cron-timer,CamelProperty.CamelToEndpoint=direct://end?pollingConsumerBlockTimeout=0&pollingConsumerBlockWhenFull=true&pollingConsumerQueueSize=1000: Message value: +% Reached end of topic mytopic [0] at offset 15 +---- diff --git a/cron/cron-source/config/CamelCronSourceConnector.properties b/cron/cron-source/config/CamelCronSourceConnector.properties new file mode 100644 index 0000000..ba55dbd --- /dev/null +++ b/cron/cron-source/config/CamelCronSourceConnector.properties @@ -0,0 +1,28 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +name=CamelCronSourceConnector +connector.class=org.apache.camel.kafkaconnector.cron.CamelCronSourceConnector +tasks.max=1 + +key.converter=org.apache.kafka.connect.storage.StringConverter +value.converter=org.apache.kafka.connect.storage.StringConverter + +topics=mytopic + +camel.source.endpoint.schedule=0/5+*+*+*+*+? +camel.source.path.name=cron-timer