This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/camel-kafka-connector-examples.git


The following commit(s) were added to refs/heads/master by this push:
     new 022a45d  Added an AWS2-Kinesis sink example
022a45d is described below

commit 022a45d966b95d54102457340816b9dee845bf0d
Author: Andrea Cosentino <anco...@gmail.com>
AuthorDate: Fri Jan 15 18:09:23 2021 +0100

    Added an AWS2-Kinesis sink example
---
 aws2-kinesis/aws2-kinesis-sink/README.adoc         | 113 +++++++++++++++++++++
 .../CamelAWS2KinesisSinkConnector.properties       |  27 +++++
 2 files changed, 140 insertions(+)

diff --git a/aws2-kinesis/aws2-kinesis-sink/README.adoc 
b/aws2-kinesis/aws2-kinesis-sink/README.adoc
new file mode 100644
index 0000000..ca093b5
--- /dev/null
+++ b/aws2-kinesis/aws2-kinesis-sink/README.adoc
@@ -0,0 +1,113 @@
+# Camel-Kafka-connector AWS2 Kinesis Sink
+
+This is an example for Camel-Kafka-connector AWS2-Kinesis Sink
+
+## Standalone
+
+### What is needed
+
+- An AWS Kinesis stream
+- The AWS CLI locally
+- Some work on AWS console
+
+### Running Kafka
+
+```
+$KAFKA_HOME/bin/zookeeper-server-start.sh 
$KAFKA_HOME/config/zookeeper.properties
+$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties
+$KAFKA_HOME/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 
--replication-factor 1 --partitions 1 --topic mytopic
+```
+
+### Setting up the needed bits and running the example
+
+You'll need to setup the plugin.path property in your kafka
+
+Open the `$KAFKA_HOME/config/connect-standalone.properties`
+
+and set the `plugin.path` property to your choosen location
+
+In this example we'll use `/home/oscerd/connectors/`
+
+```
+> cd /home/oscerd/connectors/
+> wget 
https://repo1.maven.org/maven2/org/apache/camel/kafkaconnector/camel-aws2-kinesis-kafka-connector/0.7.0/camel-aws2-kinesis-kafka-connector-0.7.0-package.zip
+> unzip camel-aws2-kinesis-kafka-connector-0.7.0-package.zip
+```
+
+On AWS console create a Kinesis stream delivery stream named streamTest.
+
+Now it's time to setup the connectors
+
+Open the AWS2 Kinesis configuration file
+
+```
+name=CamelAws2-kinesisSinkConnector
+connector.class=org.apache.camel.kafkaconnector.aws2kinesis.CamelAws2kinesisSinkConnector
+
+key.converter=org.apache.kafka.connect.storage.StringConverter
+value.converter=org.apache.kafka.connect.storage.StringConverter
+
+topics=mytopic
+
+camel.sink.path.streamName=streamTest
+camel.sink.endpoint.accessKey=xxxx
+camel.sink.endpoint.secretKey=yyyy
+camel.sink.endpoint.region=region
+```
+
+and add the correct credentials for AWS.
+
+Now you can run the example
+
+```
+$KAFKA_HOME/bin/connect-standalone.sh 
$KAFKA_HOME/config/connect-standalone.properties 
config/CamelAWS2KinesisSinkConnector.properties
+```
+
+Now send message to kafka topic in this way for example:
+
+```
+> echo "hello there" | ./kafkacat -b localhost:9092 -H 
"CamelHeader.CamelAwsKinesisPartitionKey=partition-1" -t mytopic
+% Auto-selecting Producer mode (use -P or -C to override)
+> echo "hello there" | ./kafkacat -b localhost:9092 -H 
"CamelHeader.CamelAwsKinesisPartitionKey=partition-1" -t mytopic
+% Auto-selecting Producer mode (use -P or -C to override)
+```
+
+To verify the record are present in the streamTest stream we can the AWS CLI
+
+First we need to get the shardIterator
+
+```
+> aws kinesis get-shard-iterator --shard-id shardId-000000000000 
--shard-iterator-type TRIM_HORIZON --stream-name streamTest
+{
+    "ShardIterator": 
"AAAAAAAAAAGxdqX2OPHzjl3szvOLjdl21ylngnoD9zW3PSvRZHvQu825c0TCgA/M4Z5/dzZzBIJ1JR6h4VF2kmqFsEHOHXQ7gBq1mqXsBxUdk8Xvj1EkzUIbi3tcQFdmXSgW0O+9oTIJZ5ljiWFAwd1Czx1BsiB2c2RcqKUz/nRJjNL5MQBKywKuDEcplfVh+C2NnOCFdKqIamH0KeuK0UXhSHK1ghlW"
+}
+```
+
+After this we'll need to perform a get-records operation
+
+> aws kinesis get-records --shard-iterator 
AAAAAAAAAAGxdqX2OPHzjl3szvOLjdl21ylngnoD9zW3PSvRZHvQu825c0TCgA/M4Z5/dzZzBIJ1JR6h4VF2kmqFsEHOHXQ7gBq1mqXsBxUdk8Xvj1EkzUIbi3tcQFdmXSgW0O+9oTIJZ5ljiWFAwd1Czx1BsiB2c2RcqKUz/nRJjNL5MQBKywKuDEcplfVh+C2NnOCFdKqIamH0KeuK0UXhSHK1ghlW
+{
+    "Records": [
+        {
+            "Data": "aGVsbG8gdGhlcmU=", 
+            "PartitionKey": "partition-1", 
+            "ApproximateArrivalTimestamp": 1610729857.904, 
+            "SequenceNumber": 
"49614584677004495689019783087056269304781414429070721026"
+        }, 
+        {
+            "Data": "aGVsbG8gdGhlcmU=", 
+            "PartitionKey": "partition-1", 
+            "ApproximateArrivalTimestamp": 1610729861.765, 
+            "SequenceNumber": 
"49614584677004495689019783087057478230601029333123334146"
+        }
+    ], 
+    "NextShardIterator": 
"AAAAAAAAAAFWEhvAPrJc6dctkUTv5cFSIIcaQshFYv5wtlofGWJfmi8NjQljI5B4xzdVTE23zik9sbx+G0+T8CxTXScStjWVcZMNRi0Gt11lE0a8a+WkzP5/Zmm8Gf6X6f3w5P/tNzRUFCQc+Tg7eNOeevjiyRdn0271qOtfk5gS7NVtSaSGq13CwV3FWcCN2FzE9F8K04+8YihNrvBNhcuFIU3jyBhY",
 
+    "MillisBehindLatest": 0
+}
+```
+
+As you may see we have now two records.
+
+
+
+
diff --git 
a/aws2-kinesis/aws2-kinesis-sink/config/CamelAWS2KinesisSinkConnector.properties
 
b/aws2-kinesis/aws2-kinesis-sink/config/CamelAWS2KinesisSinkConnector.properties
new file mode 100644
index 0000000..71b926e
--- /dev/null
+++ 
b/aws2-kinesis/aws2-kinesis-sink/config/CamelAWS2KinesisSinkConnector.properties
@@ -0,0 +1,27 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+name=CamelAws2-kinesisSinkConnector
+connector.class=org.apache.camel.kafkaconnector.aws2kinesis.CamelAws2kinesisSinkConnector
+key.converter=org.apache.kafka.connect.storage.StringConverter
+value.converter=org.apache.kafka.connect.storage.StringConverter
+
+topics=mytopic
+camel.sink.path.streamName=streamTest
+camel.sink.endpoint.accessKey=xxxx
+camel.sink.endpoint.secretKey=xxxx
+camel.sink.endpoint.region=region

Reply via email to