This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel-kamelets-examples.git
The following commit(s) were added to refs/heads/main by this push: new 9c05798 Added an Example of Kafka to AWS Bedrock Text Sink (#75) 9c05798 is described below commit 9c05798c04667efe1ca3f192258b191c688de3f3 Author: Andrea Cosentino <anco...@gmail.com> AuthorDate: Thu Mar 7 07:08:35 2024 +0100 Added an Example of Kafka to AWS Bedrock Text Sink (#75) Signed-off-by: Andrea Cosentino <anco...@gmail.com> --- jbang/kafka-aws-bedrock/README.adoc | 184 +++++++++++++++++++++ .../kafka-aws-bedrock/kafka-aws-bedrock.camel.yaml | 35 ++++ jbang/kafka-aws-bedrock/prompt1.txt | 1 + jbang/kafka-aws-bedrock/prompt2.txt | 1 + 4 files changed, 221 insertions(+) diff --git a/jbang/kafka-aws-bedrock/README.adoc b/jbang/kafka-aws-bedrock/README.adoc new file mode 100644 index 0000000..670647c --- /dev/null +++ b/jbang/kafka-aws-bedrock/README.adoc @@ -0,0 +1,184 @@ +== Kafka to AWS Bedrock Text Sink Kamelet + +In this sample you'll use the Kafka Source Kamelet and the AWS Bedrock Text Sink Kamelet + +=== Install JBang + +First install JBang according to https://www.jbang.dev + +When JBang is installed then you should be able to run from a shell: + +[source,sh] +---- +$ jbang --version +---- + +This will output the version of JBang. + +To run this example you can either install Camel on JBang via: + +[source,sh] +---- +$ jbang app install camel@apache/camel +---- + +Which allows to run CamelJBang with `camel` as shown below. + +=== Setup Kafka instance + +You'll need to run a Kafka cluster to point to. In this case you could use an ansible role like https://github.com/oscerd/kafka-ansible-role + +And set up a file deploy.yaml with the following content: + +```yaml +- name: role kafka + hosts: localhost + remote_user: user + + roles: + - role: kafka-ansible-role + kafka_version: 3.6.1 + path_dir: /home/user/ + unarchive_dest_dir: /home/user/kafka/demo/ + start_kafka: true +``` + +and then run + +```shell script +ansible-playbook -v deploy.yaml +``` + +This should start a Kafka instance for you, on your local machine. + +=== Set up AWS Bedrock + +The Kamelet expects the AWS credentials to be placed in `/home/<user>/.aws/credentials` + +That's reason why you see the useDefaultCredentialsProvider option set to true in the AWS Bedrock Text Sink configuration. + +=== How to run + +Then you can run this example using: + +[source,sh] +---- +$ jbang -Dcamel.jbang.version=4.5.0-SNAPSHOT camel@apache/camel run --local-kamelet-dir=<local_path_to_kamelets> kafka-aws-bedrock.camel.yaml +---- + + +=== Sending two messages + +In the log you should see the consumer: + +[source,sh] +---- +2024-03-07 06:50:53.517 INFO 11498 --- [ main] el.impl.engine.AbstractCamelContext : Routes startup (total:1 started:1 kamelets:2) +2024-03-07 06:50:53.517 INFO 11498 --- [ main] el.impl.engine.AbstractCamelContext : Started route1 (kamelet://kafka-not-secured-source) +2024-03-07 06:50:53.517 INFO 11498 --- [ main] el.impl.engine.AbstractCamelContext : Apache Camel 4.5.0-SNAPSHOT (kafka-aws-bedrock) started in 817ms (build:0ms init:0ms start:817ms) +2024-03-07 06:50:53.621 INFO 11498 --- [[bedrock-topic]] he.kafka.common.utils.AppInfoParser : Kafka version: 3.6.1 +2024-03-07 06:50:53.622 INFO 11498 --- [[bedrock-topic]] he.kafka.common.utils.AppInfoParser : Kafka commitId: 5e3c2b738d253ff5 +2024-03-07 06:50:53.622 INFO 11498 --- [[bedrock-topic]] he.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1709790653615 +2024-03-07 06:50:53.634 INFO 11498 --- [[bedrock-topic]] ort.classic.AssignmentAdapterHelper : Using NO-OP resume strategy +2024-03-07 06:50:53.634 INFO 11498 --- [[bedrock-topic]] l.component.kafka.KafkaFetchRecords : Subscribing bedrock-topic-Thread 0 to topic bedrock-topic +2024-03-07 06:50:53.636 INFO 11498 --- [[bedrock-topic]] afka.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] Subscribed to topic(s): bedrock-topic +2024-03-07 06:50:54.023 WARN 11498 --- [[bedrock-topic]] .apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] Error while fetching metadata with correlation id 2 : {bedrock-topic=LEADER_NOT_AVAILABLE} +2024-03-07 06:50:54.025 INFO 11498 --- [[bedrock-topic]] org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] Cluster ID: Wq0puR18So6u4dXsKJnMNg +2024-03-07 06:50:54.140 WARN 11498 --- [[bedrock-topic]] .apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] Error while fetching metadata with correlation id 4 : {bedrock-topic=LEADER_NOT_AVAILABLE} +2024-03-07 06:50:54.263 WARN 11498 --- [[bedrock-topic]] .apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] Error while fetching metadata with correlation id 6 : {bedrock-topic=LEADER_NOT_AVAILABLE} +2024-03-07 06:50:54.371 WARN 11498 --- [[bedrock-topic]] .apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] Error while fetching metadata with correlation id 8 : {bedrock-topic=LEADER_NOT_AVAILABLE} +2024-03-07 06:50:54.784 INFO 11498 --- [[bedrock-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] Discovered group coordinator ghost:9092 (id: 2147483647 rack: null) +2024-03-07 06:50:54.789 INFO 11498 --- [[bedrock-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] (Re-)joining group +2024-03-07 06:50:54.821 INFO 11498 --- [[bedrock-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] Request joining group due to: need to re-join with the given member-id: consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1-e88d4fe5-52ef-40cf-940d-42758c538276 +2024-03-07 06:50:54.823 INFO 11498 --- [[bedrock-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] Request joining group due to: rebalance failed due to 'The group member needs to have a valid member id before actually entering a consumer group.' (MemberIdRequiredException) +2024-03-07 06:50:54.823 INFO 11498 --- [[bedrock-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] (Re-)joining group +2024-03-07 06:50:54.846 INFO 11498 --- [[bedrock-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] Successfully joined group with generation Generation{generationId=1, memberId='consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1-e88d4fe5-52ef-40cf-940d-42758c538276', protocol='range'} +2024-03-07 06:50:54.857 INFO 11498 --- [[bedrock-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] Finished assignment for group at generation 1: {consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1-e88d4fe5-52ef-40cf-940d-42758c538276=Assignment(partitions=[bedrock-topic-0])} +2024-03-07 06:50:54.934 INFO 11498 --- [[bedrock-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] Successfully synced group in generation Generation{generationId=1, memberId='consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1-e88d4fe5-52ef-40cf-940d-42758c538276', protocol='range'} +2024-03-07 06:50:54.935 INFO 11498 --- [[bedrock-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] Notifying assignor about the new Assignment(partitions=[bedrock-topic-0]) +2024-03-07 06:50:54.937 INFO 11498 --- [[bedrock-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] Adding newly assigned partitions: bedrock-topic-0 +2024-03-07 06:50:54.962 INFO 11498 --- [[bedrock-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] Found no committed offset for partition bedrock-topic-0 +2024-03-07 06:50:54.989 INFO 11498 --- [[bedrock-topic]] onsumer.internals.SubscriptionState : [Consumer clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] Resetting offset for partition bedrock-topic-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[ghost:9092 (id: 0 rack: null)], epoch=0}}. +---- + +In the example folder you have two different messages, prompt1.txt and prompt2.txt, the first one will request the generation of a Json record and the second one an XML record. Both of them need to have an id field, a product name and a price. + +We use kcat to send the messages to kafka and consume them. + +[source,sh] +---- +$ kcat -P -b localhost:9092 -t bedrock-topic prompt1.txt +$ kcat -P -b localhost:9092 -t bedrock-topic prompt2.txt +---- + +If you look in the example folder you should see a results folder now: + +[source,sh] +---- +$ cd results/ +$ cat * +---- + +You have two different files and if you cat the content you should see + +[source,sh] +---- +```tabular-data-json +{ + "rows": [ + { + "id": 1, + "product_name": "Phone 12", + "price": "$999" + }, + { + "id": 2, + "product_name": "Phone 11", + "price": "$899" + }, + { + "id": 3, + "product_name": "Phone X", + "price": "$799" + }, + { + "id": 4, + "product_name": "Phone XS", + "price": "$699" + }, + { + "id": 5, + "product_name": "Phone XR", + "price": "$599" + } + ] +} +``` +```tabular-data-xml +<record id="1"> + <product name="Apple iPhone 12 Pro Max">$1,099</product> +</record> +<record id="2"> + <product name="Samsung Galaxy S21 Ultra 5G">$1,199</product> +</record> +<record id="3"> + <product name="Google Pixel 6 Pro">$899</product> +</record> +<record id="4"> + <product name="Apple iPhone 11 Pro">$999</product> +</record> +<record id="5"> + <product name="Samsung Galaxy Note 20 Ultra 5G">$1,299</product> +</record> +``` +---- + +=== Help and contributions + +If you hit any problem using Camel or have some feedback, then please +https://camel.apache.org/community/support/[let us know]. + +We also love contributors, so +https://camel.apache.org/community/contributing/[get involved] :-) + +The Camel riders! diff --git a/jbang/kafka-aws-bedrock/kafka-aws-bedrock.camel.yaml b/jbang/kafka-aws-bedrock/kafka-aws-bedrock.camel.yaml new file mode 100644 index 0000000..7b0680b --- /dev/null +++ b/jbang/kafka-aws-bedrock/kafka-aws-bedrock.camel.yaml @@ -0,0 +1,35 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- + +- route: + from: + uri: "kamelet:kafka-not-secured-source" + parameters: + topic: "bedrock-topic" + bootstrapServers: "localhost:9092" + groupId: 'my-consumer-group' + steps: + - to: + uri: "kamelet:aws-bedrock-text-sink" + parameters: + region: "us-east-1" + useDefaultCredentialsProvider: "true" + modelId: "amazon.titan-text-express-v1" + - transform: + simple: + jq: " .results[0].outputText" + - to: "file:./results/" diff --git a/jbang/kafka-aws-bedrock/prompt1.txt b/jbang/kafka-aws-bedrock/prompt1.txt new file mode 100644 index 0000000..d8def8d --- /dev/null +++ b/jbang/kafka-aws-bedrock/prompt1.txt @@ -0,0 +1 @@ +{"inputText":"User: Can you please generate five Json record, with id field, product name and price? The output should be in JSON format.","textGenerationConfig":{"maxTokenCount":1024,"stopSequences":["User:"],"temperature":0,"topP":1}} diff --git a/jbang/kafka-aws-bedrock/prompt2.txt b/jbang/kafka-aws-bedrock/prompt2.txt new file mode 100644 index 0000000..0503ff7 --- /dev/null +++ b/jbang/kafka-aws-bedrock/prompt2.txt @@ -0,0 +1 @@ +{"inputText":"User: Can you please generate five XML record, with id field, product name and price? The output should be in XML format.","textGenerationConfig":{"maxTokenCount":1024,"stopSequences":["User:"],"temperature":0,"topP":1}}