This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch kafka-s3-openshift-example in repository https://gitbox.apache.org/repos/asf/camel-kamelets-examples.git
commit ae05598eb7e8ceb80b51ae58b3f859086daa050d Author: Andrea Cosentino <anco...@gmail.com> AuthorDate: Thu Nov 16 11:55:46 2023 +0100 Kafka-S3 Example with Strimzi and Camel K operator on openshift Signed-off-by: Andrea Cosentino <anco...@gmail.com> --- camel-k/kafka-s3/README.adoc | 159 +++++++++++++++++++++ .../kafka-s3/kafka-not-secured-sink.kamelet.yaml | 90 ++++++++++++ .../kafka-s3/kafka-not-secured-source.kamelet.yaml | 102 +++++++++++++ .../main.tf | 4 +- 4 files changed, 353 insertions(+), 2 deletions(-) diff --git a/camel-k/kafka-s3/README.adoc b/camel-k/kafka-s3/README.adoc new file mode 100644 index 0000000..c81772d --- /dev/null +++ b/camel-k/kafka-s3/README.adoc @@ -0,0 +1,159 @@ +== Kafka to S3 KameletBinding example + +In this sample you'll use Strizi Operator and Camel K Operator on Openshift Cloud + +=== Setup the Strimzi Operator + +For the purpose of this demo we don't know authentication so we can install the Strimzi operator. + +Install the operator on a defined namespace name. + +Once the operator completes, create a Kafka instance (default) and a Kafka Topic (default, named my-topic) + +=== Setup the Camel K Operator + +On the same namespace install the Camel K operator from Apache (and choose stable-1.10 as channel) + +Once it is completed we are ready to go. + +=== Prepare a Bucket on AWS S3 + +Create a bucket on your AWS account and choose a specific region. You can do that directly, or you can do that through AWS CLI + +```bash +aws s3api create-bucket --bucket strimzi-bucket --region eu-west-1 --create-bucket-configuration LocationConstraint=eu-west-1 +``` + +=== Install Custom Kamelets + +Be sure to be on the same namespace as the installed operator + +```bash +oc project <namespace_name> +``` + +For the purpose of this demo, we need to install the two custom Kamelets not included in the catalog for 1.10.4 + +```bash +> oc apply -f kafka-not-secured-sink.kamelet.yaml +> kamelet.camel.apache.org/kafka-not-secured-sink created +``` + +Same for the source + +```bash +> oc apply -f kafka-not-secured-source.kamelet.yaml +> kamelet.camel.apache.org/kafka-not-secured-source created +``` + +=== Prepare KameletBinding + +Now we need to setup the KameletBinding + +First all let's recover the bootstrapServers and the IP needed to connect to Strimzi Cluster + +```bash +oc describe service my-cluster-kafka-bootstrap +``` + +Copy the IP and the port will be 9092. + +Open kafka-s3.yaml and timer-kafka.yaml + +and edit the bootstrapServers field with the content <IP>:9092 + +the topic will be "my-topic". + +For the S3 part in kafka-s3.yaml file just add the correct accessKey, secretKey, region (eu-west-1) and as bucketNameOrArn strimzi-bucket + +=== Run + +We can now run the KameletBinding + +```bash +> oc apply -f timer-kafka.yaml +kameletbinding.camel.apache.org/timer-kafka-binding created +> oc apply -f kafka-s3.yaml +kameletbinding.camel.apache.org/kafka-s3-binding created +``` + +You can watch for new pods spinned up + +```bash +[oscerd@ghost kafka-s3]$ oc get pods -w +NAME READY STATUS RESTARTS AGE +camel-k-operator-bd56fcd94-mvsjp 1/1 Running 0 68m +my-cluster-entity-operator-79bcdb57f5-6jlbj 3/3 Running 0 56m +my-cluster-kafka-0 1/1 Running 0 56m +my-cluster-kafka-1 1/1 Running 0 56m +my-cluster-kafka-2 1/1 Running 0 56m +my-cluster-zookeeper-0 1/1 Running 0 56m +my-cluster-zookeeper-1 1/1 Running 0 56m +my-cluster-zookeeper-2 1/1 Running 0 56m +strimzi-cluster-operator-v0.38.0-7b7447c99f-v52tl 1/1 Running 0 68m +timer-kafka-binding-9485d8cb9-9mr7s 1/1 Running 0 51s +``` + +At some point both the two bindings will be in Running state + +Let's look at logs: + +```bash +> oc logs timer-kafka-binding.. +2023-11-16 10:43:03,530 INFO [org.apa.cam.k.Runtime] (main) Apache Camel K Runtime 1.15.2 +2023-11-16 10:43:03,552 INFO [org.apa.cam.qua.cor.CamelBootstrapRecorder] (main) Bootstrap runtime: org.apache.camel.quarkus.main.CamelMainRuntime +2023-11-16 10:43:03,555 INFO [org.apa.cam.mai.MainSupport] (main) Apache Camel (Main) 3.18.3 is starting +2023-11-16 10:43:03,604 INFO [org.apa.cam.k.lis.SourcesConfigurer] (main) Loading routes from: SourceDefinition{name='camel-k-embedded-flow', language='yaml', type='source', location='file:/etc/camel/sources/camel-k-embedded-flow.yaml', } +2023-11-16 10:43:03,664 INFO [org.apa.cam.k.lis.SourcesConfigurer] (main) Loading routes from: SourceDefinition{name='kafka-not-secured-sink', language='yaml', type='source', location='file:/etc/camel/sources/kafka-not-secured-sink.yaml', } +2023-11-16 10:43:03,684 INFO [org.apa.cam.k.lis.SourcesConfigurer] (main) Loading routes from: SourceDefinition{name='timer-source', language='yaml', type='source', location='file:/etc/camel/sources/timer-source.yaml', } +2023-11-16 10:43:03,925 INFO [org.apa.cam.imp.eng.AbstractCamelContext] (main) Apache Camel 3.18.3 (camel-1) is starting +2023-11-16 10:43:04,131 INFO [org.apa.cam.imp.eng.AbstractCamelContext] (main) Routes startup (started:3) +2023-11-16 10:43:04,132 INFO [org.apa.cam.imp.eng.AbstractCamelContext] (main) Started binding (kamelet://timer-source/source) +2023-11-16 10:43:04,132 INFO [org.apa.cam.imp.eng.AbstractCamelContext] (main) Started source (timer://tick) +2023-11-16 10:43:04,132 INFO [org.apa.cam.imp.eng.AbstractCamelContext] (main) Started sink (kamelet://source) +2023-11-16 10:43:04,133 INFO [org.apa.cam.imp.eng.AbstractCamelContext] (main) Apache Camel 3.18.3 (camel-1) started in 334ms (build:0ms init:127ms start:207ms) +2023-11-16 10:43:04,149 INFO [io.quarkus] (main) camel-k-integration 1.10.4 on JVM (powered by Quarkus 2.13.4.Final) started in 2.815s. +2023-11-16 10:43:04,149 INFO [io.quarkus] (main) Profile prod activated. +2023-11-16 10:43:04,149 INFO [io.quarkus] (main) Installed features: [camel-bean, camel-core, camel-k-core, camel-k-runtime, camel-kafka, camel-kamelet, camel-kubernetes, camel-timer, camel-yaml-dsl, cdi, kafka-client, kubernetes-client, security] +``` + +For Kafka-s3 instead + +```bash +> oc logs kafka-s3-binding.. +2023-11-16 10:45:53,138 INFO [org.apa.cam.k.Runtime] (main) Apache Camel K Runtime 1.15.2 +2023-11-16 10:45:53,546 INFO [org.apa.cam.qua.cor.CamelBootstrapRecorder] (main) Bootstrap runtime: org.apache.camel.quarkus.main.CamelMainRuntime +2023-11-16 10:45:53,552 INFO [org.apa.cam.mai.MainSupport] (main) Apache Camel (Main) 3.18.3 is starting +2023-11-16 10:45:53,607 INFO [org.apa.cam.k.lis.SourcesConfigurer] (main) Loading routes from: SourceDefinition{name='camel-k-embedded-flow', language='yaml', type='source', location='file:/etc/camel/sources/camel-k-embedded-flow.yaml', } +2023-11-16 10:45:53,658 INFO [org.apa.cam.k.lis.SourcesConfigurer] (main) Loading routes from: SourceDefinition{name='aws-s3-sink', language='yaml', type='source', location='file:/etc/camel/sources/aws-s3-sink.yaml', } +2023-11-16 10:45:53,664 INFO [org.apa.cam.k.lis.SourcesConfigurer] (main) Loading routes from: SourceDefinition{name='kafka-not-secured-source', language='yaml', type='source', location='file:/etc/camel/sources/kafka-not-secured-source.yaml', } +2023-11-16 10:45:53,955 INFO [org.apa.cam.imp.eng.AbstractCamelContext] (main) Apache Camel 3.18.3 (camel-1) is starting +2023-11-16 10:45:55,253 INFO [org.apa.cam.com.kaf.KafkaConsumer] (main) Starting Kafka consumer on topic: my-topic with breakOnFirstError: false +2023-11-16 10:45:55,271 INFO [org.apa.cam.com.kaf.KafkaFetchRecords] (Camel (camel-1) thread #1 - KafkaConsumer[my-topic]) Connecting Kafka consumer thread ID my-topic-Thread 0 with poll timeout of 5000 ms +2023-11-16 10:45:55,276 INFO [org.apa.cam.imp.eng.AbstractCamelContext] (main) Routes startup (started:3) +2023-11-16 10:45:55,277 INFO [org.apa.cam.imp.eng.AbstractCamelContext] (main) Started binding (kamelet://kafka-not-secured-source/source) +2023-11-16 10:45:55,279 INFO [org.apa.cam.imp.eng.AbstractCamelContext] (main) Started source (kafka://my-topic) +2023-11-16 10:45:55,279 INFO [org.apa.cam.imp.eng.AbstractCamelContext] (main) Started sink (kamelet://source) +2023-11-16 10:45:55,282 INFO [org.apa.cam.imp.eng.AbstractCamelContext] (main) Apache Camel 3.18.3 (camel-1) started in 1s467ms (build:0ms init:141ms start:1s326ms) +2023-11-16 10:45:55,288 INFO [io.quarkus] (main) camel-k-integration 1.10.4 on JVM (powered by Quarkus 2.13.4.Final) started in 3.968s. +2023-11-16 10:45:55,288 INFO [io.quarkus] (main) Profile prod activated. +2023-11-16 10:45:55,288 INFO [io.quarkus] (main) Installed features: [camel-aws2-s3, camel-bean, camel-core, camel-k-core, camel-k-runtime, camel-kafka, camel-kamelet, camel-kubernetes, camel-yaml-dsl, cdi, kafka-client, kubernetes-client, security] +2023-11-16 10:45:55,497 INFO [org.apa.cam.com.kaf.con.sup.ResumeStrategyFactory] (Camel (camel-1) thread #1 - KafkaConsumer[my-topic]) Using NO-OP resume strategy +2023-11-16 10:45:55,498 INFO [org.apa.cam.com.kaf.KafkaFetchRecords] (Camel (camel-1) thread #1 - KafkaConsumer[my-topic]) Subscribing my-topic-Thread 0 to topic my-topic +``` + +Both of them are running. + +Let's now check the content of the s3 bucket. + +```bash +> aws s3 ls s3://strimzi-bucket --recursive --human-readable --summarize +2023-11-16 11:52:32 10 Bytes 701027CE70E080F-0000000000000000 +2023-11-16 11:53:01 10 Bytes 701027CE70E080F-0000000000000001 +2023-11-16 11:53:31 10 Bytes 701027CE70E080F-0000000000000002 +2023-11-16 11:54:02 10 Bytes 701027CE70E080F-0000000000000003 +2023-11-16 11:54:31 10 Bytes 701027CE70E080F-0000000000000004 + +Total Objects: 5 + Total Size: 50 Bytes +``` diff --git a/camel-k/kafka-s3/kafka-not-secured-sink.kamelet.yaml b/camel-k/kafka-s3/kafka-not-secured-sink.kamelet.yaml new file mode 100644 index 0000000..d7ba763 --- /dev/null +++ b/camel-k/kafka-s3/kafka-not-secured-sink.kamelet.yaml @@ -0,0 +1,90 @@ +# --------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# --------------------------------------------------------------------------- +apiVersion: camel.apache.org/v1alpha1 +kind: Kamelet +metadata: + name: kafka-not-secured-sink + annotations: + camel.apache.org/kamelet.support.level: "Stable" + camel.apache.org/catalog.version: "0.9.4" + camel.apache.org/kamelet.icon: "data:image/svg+xml;base64,PD94bWwgdmVyc2lvbj0iMS4wIiBlbmNvZGluZz0idXRmLTgiPz4NCjwhLS0gR2VuZXJhdG9yOiBBZG9iZSBJbGx1c3RyYXRvciAxOS4wLjAsIFNWRyBFeHBvcnQgUGx1Zy1JbiAuIFNWRyBWZXJzaW9uOiA2LjAwIEJ1aWxkIDApICAtLT4NCjxzdmcgdmVyc2lvbj0iMS4xIiBpZD0iTGF5ZXJfMSIgeG1sbnM9Imh0dHA6Ly93d3cudzMub3JnLzIwMDAvc3ZnIiB4bWxuczp4bGluaz0iaHR0cDovL3d3dy53My5vcmcvMTk5OS94bGluayIgeD0iMHB4IiB5PSIwcHgiDQoJIHZpZXdCb3g9IjAgMCA1MDAgNTAwIiBzdHlsZT0iZW5hYmxlLWJhY2tncm91bmQ6bmV3IDAgMCA1MD [...] + camel.apache.org/provider: "Apache Software Foundation" + camel.apache.org/kamelet.group: "Kafka" + labels: + camel.apache.org/kamelet.type: "sink" +spec: + definition: + title: "Kafka Not Secured Sink" + description: |- + Send data to Kafka topics on an insecure broker. + + The Kamelet is able to understand the following headers to be set: + + - `key` / `ce-key`: as message key + + - `partition-key` / `ce-partitionkey`: as message partition key + + Both the headers are optional. + required: + - topic + - bootstrapServers + type: object + properties: + topic: + title: Topic Names + description: Comma separated list of Kafka topic names + type: string + bootstrapServers: + title: Bootstrap Servers + description: Comma separated list of Kafka Broker URLs + type: string + dependencies: + - "camel:core" + - "camel:kafka" + - "camel:kamelet" + template: + from: + uri: "kamelet:source" + steps: + - choice: + when: + - simple: "${header[key]}" + steps: + - set-header: + name: kafka.KEY + simple: "${header[key]}" + - simple: "${header[ce-key]}" + steps: + - set-header: + name: kafka.KEY + simple: "${header[ce-key]}" + - choice: + when: + - simple: "${header[partition-key]}" + steps: + - set-header: + name: kafka.PARTITION_KEY + simple: "${header[partition-key]}" + - simple: "${header[ce-partitionkey]}" + steps: + - set-header: + name: kafka.PARTITION_KEY + simple: "${header[ce-partitionkey]}" + - to: + uri: "kafka:{{topic}}" + parameters: + brokers: "{{bootstrapServers}}" diff --git a/camel-k/kafka-s3/kafka-not-secured-source.kamelet.yaml b/camel-k/kafka-s3/kafka-not-secured-source.kamelet.yaml new file mode 100644 index 0000000..a84538b --- /dev/null +++ b/camel-k/kafka-s3/kafka-not-secured-source.kamelet.yaml @@ -0,0 +1,102 @@ +# --------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# --------------------------------------------------------------------------- +apiVersion: camel.apache.org/v1alpha1 +kind: Kamelet +metadata: + name: kafka-not-secured-source + annotations: + camel.apache.org/kamelet.support.level: "Stable" + camel.apache.org/catalog.version: "0.9.4" + camel.apache.org/kamelet.icon: "data:image/svg+xml;base64,PD94bWwgdmVyc2lvbj0iMS4wIiBlbmNvZGluZz0idXRmLTgiPz4NCjwhLS0gR2VuZXJhdG9yOiBBZG9iZSBJbGx1c3RyYXRvciAxOS4wLjAsIFNWRyBFeHBvcnQgUGx1Zy1JbiAuIFNWRyBWZXJzaW9uOiA2LjAwIEJ1aWxkIDApICAtLT4NCjxzdmcgdmVyc2lvbj0iMS4xIiBpZD0iTGF5ZXJfMSIgeG1sbnM9Imh0dHA6Ly93d3cudzMub3JnLzIwMDAvc3ZnIiB4bWxuczp4bGluaz0iaHR0cDovL3d3dy53My5vcmcvMTk5OS94bGluayIgeD0iMHB4IiB5PSIwcHgiDQoJIHZpZXdCb3g9IjAgMCA1MDAgNTAwIiBzdHlsZT0iZW5hYmxlLWJhY2tncm91bmQ6bmV3IDAgMCA1MD [...] + camel.apache.org/provider: "Apache Software Foundation" + camel.apache.org/kamelet.group: "Kafka" + camel.apache.org/keda.type: "kafka" + labels: + camel.apache.org/kamelet.type: "source" +spec: + definition: + title: "Kafka Not Secured Source" + description: |- + Receive data from Kafka topics on an insecure broker. + required: + - topic + - bootstrapServers + type: object + properties: + topic: + title: Topic Names + description: Comma separated list of Kafka topic names + type: string + x-descriptors: + - urn:keda:metadata:topic + - urn:keda:required + bootstrapServers: + title: Bootstrap Servers + description: Comma separated list of Kafka Broker URLs + type: string + x-descriptors: + - urn:keda:metadata:bootstrapServers + - urn:keda:required + autoCommitEnable: + title: Auto Commit Enable + description: If true, periodically commit to ZooKeeper the offset of messages already fetched by the consumer + type: boolean + default: true + x-descriptors: + - 'urn:alm:descriptor:com.tectonic.ui:checkbox' + allowManualCommit: + title: Allow Manual Commit + description: Whether to allow doing manual commits + type: boolean + default: false + x-descriptors: + - 'urn:alm:descriptor:com.tectonic.ui:checkbox' + pollOnError: + title: Poll On Error Behavior + description: What to do if kafka threw an exception while polling for new messages. There are 5 enums and the value can be one of DISCARD, ERROR_HANDLER, RECONNECT, RETRY, STOP + type: string + default: "ERROR_HANDLER" + autoOffsetReset: + title: Auto Offset Reset + description: What to do when there is no initial offset. There are 3 enums and the value can be one of latest, earliest, none + type: string + default: "latest" + x-descriptors: + - urn:keda:metadata:offsetResetPolicy + consumerGroup: + title: Consumer Group + description: A string that uniquely identifies the group of consumers to which this source belongs + type: string + example: "my-group-id" + x-descriptors: + - urn:keda:metadata:consumerGroup + - urn:keda:required + dependencies: + - "camel:kafka" + - "camel:kamelet" + template: + from: + uri: "kafka:{{topic}}" + parameters: + brokers: "{{bootstrapServers}}" + autoCommitEnable: "{{autoCommitEnable}}" + allowManualCommit: "{{allowManualCommit}}" + pollOnError: "{{pollOnError}}" + autoOffsetReset: "{{autoOffsetReset}}" + groupId: "{{?consumerGroup}}" + steps: + - to: "kamelet:sink" diff --git a/jbang/azure-eventhubs-kafka-azure-schema-registry/main.tf b/jbang/azure-eventhubs-kafka-azure-schema-registry/main.tf index ba4fe6d..ecc6592 100644 --- a/jbang/azure-eventhubs-kafka-azure-schema-registry/main.tf +++ b/jbang/azure-eventhubs-kafka-azure-schema-registry/main.tf @@ -33,13 +33,13 @@ provider "azurerm" { # Resource group. resource "azurerm_resource_group" "example" { - name = "example-rg" + name = "example-test12345-rg" location = "West Europe" } # Eventhubs Namepsace. resource "azurerm_eventhub_namespace" "example" { - name = "example-namespace" + name = "example-test12345-namespace" location = azurerm_resource_group.example.location resource_group_name = azurerm_resource_group.example.name sku = "Standard"