This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch kafka-ibm-mq in repository https://gitbox.apache.org/repos/asf/camel-kamelets-examples.git
commit 230b81a26d5d09fff372649f7bed03ffb2e43b29 Author: Andrea Cosentino <anco...@gmail.com> AuthorDate: Wed Mar 1 14:56:11 2023 +0100 Add Example for Kafka to IBM MQ Signed-off-by: Andrea Cosentino <anco...@gmail.com> --- jbang/kafka-mq/README.adoc | 172 +++++++++++++++++++++++++++++++++++++++++++ jbang/kafka-mq/kafka-mq.yaml | 22 ++++++ jbang/kafka-mq/mq-log.yaml | 19 +++++ 3 files changed, 213 insertions(+) diff --git a/jbang/kafka-mq/README.adoc b/jbang/kafka-mq/README.adoc new file mode 100644 index 0000000..0f33e93 --- /dev/null +++ b/jbang/kafka-mq/README.adoc @@ -0,0 +1,172 @@ +== Kafka to IBM MQ and consuming messages example + +In this sample you'll use the Kafka Source Kamelet and the IBM MQ Sink one. + +=== Install JBang + +First install JBang according to https://www.jbang.dev + +When JBang is installed then you should be able to run from a shell: + +[source,sh] +---- +$ jbang --version +---- + +This will output the version of JBang. + +To run this example you can either install Camel on JBang via: + +[source,sh] +---- +$ jbang app install camel@apache/camel +---- + +Which allows to run CamelJBang with `camel` as shown below. + +=== Setup Kafka instance + +You'll need to run a Kafka cluster to point to. In this case you could use an ansible role like https://github.com/oscerd/kafka-ansible-role + +And set up a file deploy.yaml with the following content: + +```yaml +- name: role kafka + hosts: localhost + remote_user: user + + roles: + - role: kafka-ansible-role + kafka_version: 3.2.3 + path_dir: /home/user/ + unarchive_dest_dir: /home/user/kafka/demo/ + start_kafka: true +``` + +and then run + +```shell script +ansible-playbook -v deploy.yaml +``` + +This should start a Kafka instance for you, on your local machine. + +=== Set up IBM MQ + +We could do this through running the following docker container: + +```bash +docker volume create qm1data +``` + +and then run + +```bash +docker run --env LICENSE=accept --env MQ_QMGR_NAME=QM1 --volume qm1data:/mnt/mqm --publish 1414:1414 --publish 9443:9443 --detach --env MQ_APP_PASSWORD=passw0rd --name QM1 icr.io/ibm-messaging/mq:latest +``` + +To check everything is fine: + +```bash +docker exec -ti QM1 bash +dspmq +``` + +This should give a RUNNING state. + +=== How to run + +Then you can run this example using: + +[source,sh] +---- +$ jbang run camel@apache/camel run -Dcamel.jbang.version=3.20.2 --local-kamelet-dir=<local-kamelets-dir> kafka-mq.yaml +---- + +[source,sh] +---- +$ jbang run camel@apache/camel run -Dcamel.jbang.version=3.20.2 --local-kamelet-dir=<local-kamelets-dir> mq-log.yaml +---- + +Replace the local kamelet dir with your local directory. + +Don't forget to get the IBM MQ container IP Address and use it as serverName parameter in the Yaml for kafka-mq and mq-log. + +=== Create and delete an object + +For Kafka-mq integration you should see: + +[source,sh] +---- +2023-03-01 14:50:55.503 INFO 128819 --- [onsumer[test-1]] he.kafka.common.utils.AppInfoParser : Kafka version: 3.2.3 +2023-03-01 14:50:55.503 INFO 128819 --- [onsumer[test-1]] he.kafka.common.utils.AppInfoParser : Kafka commitId: 50029d3ed8ba576f +2023-03-01 14:50:55.503 INFO 128819 --- [onsumer[test-1]] he.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1677678655501 +2023-03-01 14:50:55.509 INFO 128819 --- [onsumer[test-1]] ort.classic.AssignmentAdapterHelper : Using NO-OP resume strategy +2023-03-01 14:50:55.509 INFO 128819 --- [onsumer[test-1]] l.component.kafka.KafkaFetchRecords : Subscribing test-1-Thread 0 to topic test-1 +2023-03-01 14:50:55.510 INFO 128819 --- [onsumer[test-1]] afka.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-3b81e87c-6188-40f2-bb9b-872b28568949-1, groupId=3b81e87c-6188-40f2-bb9b-872b28568949] Subscribed to topic(s): test-1 +2023-03-01 14:50:55.782 WARN 128819 --- [onsumer[test-1]] .apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-3b81e87c-6188-40f2-bb9b-872b28568949-1, groupId=3b81e87c-6188-40f2-bb9b-872b28568949] Error while fetching metadata with correlation id 2 : {test-1=LEADER_NOT_AVAILABLE} +2023-03-01 14:50:55.783 INFO 128819 --- [onsumer[test-1]] org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-3b81e87c-6188-40f2-bb9b-872b28568949-1, groupId=3b81e87c-6188-40f2-bb9b-872b28568949] Cluster ID: hyLj5ExnQW-U5CGu7xZsig +2023-03-01 14:50:55.784 INFO 128819 --- [onsumer[test-1]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-3b81e87c-6188-40f2-bb9b-872b28568949-1, groupId=3b81e87c-6188-40f2-bb9b-872b28568949] Discovered group coordinator ghost:9092 (id: 2147483647 rack: null) +2023-03-01 14:50:55.791 INFO 128819 --- [onsumer[test-1]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-3b81e87c-6188-40f2-bb9b-872b28568949-1, groupId=3b81e87c-6188-40f2-bb9b-872b28568949] (Re-)joining group +2023-03-01 14:50:55.808 INFO 128819 --- [onsumer[test-1]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-3b81e87c-6188-40f2-bb9b-872b28568949-1, groupId=3b81e87c-6188-40f2-bb9b-872b28568949] Request joining group due to: need to re-join with the given member-id: consumer-3b81e87c-6188-40f2-bb9b-872b28568949-1-51e9922f-a1a6-4fb8-a035-5902a5c6c495 +2023-03-01 14:50:55.809 INFO 128819 --- [onsumer[test-1]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-3b81e87c-6188-40f2-bb9b-872b28568949-1, groupId=3b81e87c-6188-40f2-bb9b-872b28568949] Request joining group due to: rebalance failed due to 'The group member needs to have a valid member id before actually entering a consumer group.' (MemberIdRequiredException) +2023-03-01 14:50:55.809 INFO 128819 --- [onsumer[test-1]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-3b81e87c-6188-40f2-bb9b-872b28568949-1, groupId=3b81e87c-6188-40f2-bb9b-872b28568949] (Re-)joining group +2023-03-01 14:50:55.811 INFO 128819 --- [onsumer[test-1]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-3b81e87c-6188-40f2-bb9b-872b28568949-1, groupId=3b81e87c-6188-40f2-bb9b-872b28568949] Successfully joined group with generation Generation{generationId=1, memberId='consumer-3b81e87c-6188-40f2-bb9b-872b28568949-1-51e9922f-a1a6-4fb8-a035-5902a5c6c495', protocol='range'} +2023-03-01 14:50:55.887 INFO 128819 --- [onsumer[test-1]] org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-3b81e87c-6188-40f2-bb9b-872b28568949-1, groupId=3b81e87c-6188-40f2-bb9b-872b28568949] Resetting the last seen epoch of partition test-1-0 to 0 since the associated topicId changed from null to Lo2u-G3-Skug-NHd4AZJYg +2023-03-01 14:50:55.899 INFO 128819 --- [onsumer[test-1]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-3b81e87c-6188-40f2-bb9b-872b28568949-1, groupId=3b81e87c-6188-40f2-bb9b-872b28568949] Finished assignment for group at generation 1: {consumer-3b81e87c-6188-40f2-bb9b-872b28568949-1-51e9922f-a1a6-4fb8-a035-5902a5c6c495=Assignment(partitions=[test-1-0])} +2023-03-01 14:50:55.911 INFO 128819 --- [onsumer[test-1]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-3b81e87c-6188-40f2-bb9b-872b28568949-1, groupId=3b81e87c-6188-40f2-bb9b-872b28568949] Successfully synced group in generation Generation{generationId=1, memberId='consumer-3b81e87c-6188-40f2-bb9b-872b28568949-1-51e9922f-a1a6-4fb8-a035-5902a5c6c495', protocol='range'} +2023-03-01 14:50:55.913 INFO 128819 --- [onsumer[test-1]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-3b81e87c-6188-40f2-bb9b-872b28568949-1, groupId=3b81e87c-6188-40f2-bb9b-872b28568949] Notifying assignor about the new Assignment(partitions=[test-1-0]) +2023-03-01 14:50:55.922 INFO 128819 --- [onsumer[test-1]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-3b81e87c-6188-40f2-bb9b-872b28568949-1, groupId=3b81e87c-6188-40f2-bb9b-872b28568949] Adding newly assigned partitions: test-1-0 +2023-03-01 14:50:55.933 INFO 128819 --- [onsumer[test-1]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-3b81e87c-6188-40f2-bb9b-872b28568949-1, groupId=3b81e87c-6188-40f2-bb9b-872b28568949] Found no committed offset for partition test-1-0 +2023-03-01 14:50:55.942 INFO 128819 --- [onsumer[test-1]] onsumer.internals.SubscriptionState : [Consumer clientId=consumer-3b81e87c-6188-40f2-bb9b-872b28568949-1, groupId=3b81e87c-6188-40f2-bb9b-872b28568949] Resetting offset for partition test-1-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[ghost:9092 (id: 0 rack: null)], epoch=0}}. +---- + +At this point you should run the consumer too, mq-log.yaml: + +[source,sh] +---- +2023-03-01 14:51:55.296 INFO 129025 --- [ main] org.apache.camel.main.MainSupport : Apache Camel (JBang) 3.20.2 is starting +2023-03-01 14:51:55.509 INFO 129025 --- [ main] org.apache.camel.main.MainSupport : Using Java 11.0.16.1 with PID 129025. Started by oscerd in /home/oscerd/workspace/apache-camel/camel-kamelets-examples/jbang +2023-03-01 14:51:55.524 INFO 129025 --- [ main] mel.cli.connector.LocalCliConnector : Camel CLI enabled (local) +2023-03-01 14:51:57.128 INFO 129025 --- [ main] el.impl.engine.AbstractCamelContext : Apache Camel 3.20.2 (mq-log) is starting +2023-03-01 14:51:57.484 INFO 129025 --- [ main] g.apache.camel.main.BaseMainSupport : Property-placeholders summary +2023-03-01 14:51:57.485 INFO 129025 --- [ main] g.apache.camel.main.BaseMainSupport : [s-ibm-mq-source.kamelet.yaml] destinationName=DEV.QUEUE.1 +2023-03-01 14:51:57.485 INFO 129025 --- [ main] g.apache.camel.main.BaseMainSupport : [s-ibm-mq-source.kamelet.yaml] wmqConnectionFactory=wmqConnectionFactory-1 +2023-03-01 14:51:57.485 INFO 129025 --- [ main] g.apache.camel.main.BaseMainSupport : [s-ibm-mq-source.kamelet.yaml] password=xxxxxx +2023-03-01 14:51:57.485 INFO 129025 --- [ main] g.apache.camel.main.BaseMainSupport : [s-ibm-mq-source.kamelet.yaml] username=xxxxxx +2023-03-01 14:51:57.485 INFO 129025 --- [ main] g.apache.camel.main.BaseMainSupport : [s-ibm-mq-source.kamelet.yaml] channel=DEV.APP.SVRCONN +2023-03-01 14:51:57.485 INFO 129025 --- [ main] g.apache.camel.main.BaseMainSupport : [s-ibm-mq-source.kamelet.yaml] serverName=172.17.0.2 +2023-03-01 14:51:57.485 INFO 129025 --- [ main] g.apache.camel.main.BaseMainSupport : [s-ibm-mq-source.kamelet.yaml] queueManager=QM1 +2023-03-01 14:51:57.485 INFO 129025 --- [ main] g.apache.camel.main.BaseMainSupport : [log-sink.kamelet.yaml] showHeaders=true +2023-03-01 14:51:57.486 INFO 129025 --- [ main] g.apache.camel.main.BaseMainSupport : [log-sink.kamelet.yaml] showStreams=true +2023-03-01 14:51:57.504 INFO 129025 --- [ main] el.impl.engine.AbstractCamelContext : Routes startup (started:3) +2023-03-01 14:51:57.504 INFO 129025 --- [ main] el.impl.engine.AbstractCamelContext : Started route1 (kamelet://jms-ibm-mq-source) +2023-03-01 14:51:57.505 INFO 129025 --- [ main] el.impl.engine.AbstractCamelContext : Started jms-ibm-mq-source-1 (jms://queue:DEV.QUEUE.1) +2023-03-01 14:51:57.505 INFO 129025 --- [ main] el.impl.engine.AbstractCamelContext : Started log-sink-2 (kamelet://source) +2023-03-01 14:51:57.505 INFO 129025 --- [ main] el.impl.engine.AbstractCamelContext : Apache Camel 3.20.2 (mq-log) started in 1s481ms (build:102ms init:1s3ms start:376ms JVM-uptime:3s) +---- + +Now it's time to send something to Kafka, we are going to use kcat utility: + +[source,sh] +---- +echo "Camel message!" | kcat -b localhost:9092 -P -t test-1 -H "header1=myHeaderValue" -H "JMSCorrelationID=test" +---- + +On the mq-log integration you should now see a message logged: + +[source,sh] +---- +2023-03-01 14:53:36.589 INFO 129025 --- [er[DEV.QUEUE.1]] log-sink : Exchange[ExchangePattern: InOnly, Headers: {CamelMessageTimestamp=1677678816530, header1=myHeaderValue, JMS_IBM_Character_Set=UTF-8, JMS_IBM_Encoding=273, JMS_IBM_Format=MQSTR , JMS_IBM_MsgType=8, JMS_IBM_PutApplType=28, JMS_IBM_PutDate=20230301, JMS_IBM_PutTime=13533653, JMSCorrelationID=test, JMSCorrelationIDAsBytes=test, JMSDeliveryMode=2, JMSDestination=queue:///DEV.QUEUE.1, JMSExpirati [...] +---- + + +=== Help and contributions + +If you hit any problem using Camel or have some feedback, then please +https://camel.apache.org/community/support/[let us know]. + +We also love contributors, so +https://camel.apache.org/community/contributing/[get involved] :-) + +The Camel riders! diff --git a/jbang/kafka-mq/kafka-mq.yaml b/jbang/kafka-mq/kafka-mq.yaml new file mode 100644 index 0000000..bb135bd --- /dev/null +++ b/jbang/kafka-mq/kafka-mq.yaml @@ -0,0 +1,22 @@ +# camel-k: dependency=mvn:com.ibm.mq:com.ibm.mq.allclient:9.2.5.0 +# camel-k: dependency=mvn:org.apache.camel.kamelets:camel-kamelets-utils:3.20.1.1 + +- route: + from: + uri: "kamelet:kafka-not-secured-source" + parameters: + bootstrapServers: "localhost:9092" + topic: test-1 + deserializeHeaders: true + steps: + - to: + uri: "kamelet:jms-ibm-mq-sink" + parameters: + channel: DEV.APP.SVRCONN + destinationName: DEV.QUEUE.1 + password: "passw0rd" + queueManager: QM1 + serverName: 172.17.0.2 + serverPort: 1414 + username: app + diff --git a/jbang/kafka-mq/mq-log.yaml b/jbang/kafka-mq/mq-log.yaml new file mode 100644 index 0000000..8816703 --- /dev/null +++ b/jbang/kafka-mq/mq-log.yaml @@ -0,0 +1,19 @@ +# camel-k: dependency=mvn:com.ibm.mq:com.ibm.mq.allclient:9.2.5.0 + +- route: + from: + uri: "kamelet:jms-ibm-mq-source" + parameters: + channel: DEV.APP.SVRCONN + destinationName: DEV.QUEUE.1 + password: "passw0rd" + queueManager: QM1 + serverName: 172.17.0.2 + serverPort: 1414 + username: app + steps: + - to: + uri: "kamelet:log-sink" + parameters: + showStreams: true + showHeaders: true