This is an automated email from the ASF dual-hosted git repository. astefanutti pushed a commit to branch release-1.4.x in repository https://gitbox.apache.org/repos/asf/camel-k.git
commit b0bfd89b9f22bfd63204ae3fefbbd10da950770d Author: Pasquale Congiusti <pasquale.congiu...@gmail.com> AuthorDate: Thu Apr 29 12:22:47 2021 +0200 refactor(example): error handler using a real Kafka topic --- .../error-handler/error-handler.kamelet.yaml | 45 +++++++++++-- .../kamelet-binding-error-handler.yaml | 19 ++---- examples/kamelets/error-handler/readme.md | 76 +++++++++++++++++++--- 3 files changed, 112 insertions(+), 28 deletions(-) diff --git a/examples/kamelets/error-handler/error-handler.kamelet.yaml b/examples/kamelets/error-handler/error-handler.kamelet.yaml index a5aa060..04ce557 100644 --- a/examples/kamelets/error-handler/error-handler.kamelet.yaml +++ b/examples/kamelets/error-handler/error-handler.kamelet.yaml @@ -21,20 +21,53 @@ metadata: name: error-handler spec: definition: - title: "Error Log Sink" - description: "Error logger channel" + title: "DLC and Log" + description: "Push an event to a kafka topic and log an error message" required: - - message + - kafka-brokers + - kafka-topic + - kafka-service-account-id + - kafka-service-account-secret + - log-message properties: - message: + kafka-brokers: + title: Kafka Brokers + description: the bootstrap server + type: string + example: "xyz-ins--rplssqfz-yyyyyyy-crojg.bf2.kafka.my-clud-service.com:443" + kafka-topic: + title: Kafka Topic + description: the DLC topic + type: string + example: "my-dlc" + kafka-service-account-id: + title: Service Account ID + description: the SA to use + type: string + example: "srvc-acct-xxxxxx-519b-453f-9f68-yyyyyyyyy" + kafka-service-account-secret: + title: Service Account Secret + description: the SA secrete to use + type: string + example: "xxxxxxxxx-46c7-4c6c-a753-yyyyyyyyyyyyyyy" + log-message: title: Message - description: The message to log + description: A message warning to log type: string example: "error while checking the source" flow: from: uri: kamelet:source steps: + # First step: send to the DLC for future processing + - to: + uri: kafka:{{kafka-topic}} + parameters: + brokers: "{{kafka-brokers}}" + security-protocol: SASL_SSL + sasl-mechanism: PLAIN + sasl-jaas-config: "org.apache.kafka.common.security.plain.PlainLoginModule required username={{kafka-service-account-id}} password={{kafka-service-account-secret}};" + # Log an error message to notify about the failure - set-body: - constant: "{{message}}" + constant: "{{log-message}} - worry not, the event is stored in the DLC" - to: "log:error-sink" diff --git a/examples/kamelets/error-handler/kamelet-binding-error-handler.yaml b/examples/kamelets/error-handler/kamelet-binding-error-handler.yaml index 7a935d6..3d3c3a5 100644 --- a/examples/kamelets/error-handler/kamelet-binding-error-handler.yaml +++ b/examples/kamelets/error-handler/kamelet-binding-error-handler.yaml @@ -30,16 +30,7 @@ spec: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: log-sink - errorHandler: -# log: -# parameters: -# maximumRedeliveries: 3 -# redeliveryDelay: 2000 -# ref: "something" -# bean: -# type: "org.apache.camel.builder.DeadLetterChannelBuilder" -# properties: -# deadLetterUri: log:error + errorHandler: dead-letter-channel: endpoint: ref: @@ -47,8 +38,12 @@ spec: apiVersion: camel.apache.org/v1alpha1 name: error-handler properties: - message: "ERROR!" + log-message: "ERROR!" + kafka-brokers: *** + kafka-topic: my-first-test + kafka-service-account-id: *** + kafka-service-account-secret: *** parameters: - maximumRedeliveries: 3 + maximumRedeliveries: 1 redeliveryDelay: 2000 diff --git a/examples/kamelets/error-handler/readme.md b/examples/kamelets/error-handler/readme.md index 42293cd..abf8ad6 100644 --- a/examples/kamelets/error-handler/readme.md +++ b/examples/kamelets/error-handler/readme.md @@ -1,5 +1,5 @@ # Kamelets Binding Error Handler example -This example shows how to create a simple _source_ `Kamelet` which sends periodically events (and certain failures). The events are consumed by a log _sink_ in a `KameletBinding`. With the support of the `ErrorHandler` we will be able to redirect all errors to a `Dead Letter Channel` _error-handler_ `Kamelet`. +This example shows how to create a simple _source_ `Kamelet` which sends periodically events (and certain failures). The events are consumed by a log _sink_ in a `KameletBinding`. With the support of the `ErrorHandler` we will be able to redirect all errors to a `Dead Letter Channel` _error-handler_ `Kamelet` whose goal is to store the events in a `Kafka` topic and provide a nice log notifying us about the error happened. ## Incremental ID Source Kamelet First of all, you must install the _incremental-id-source_ Kamelet defined in `incremental-id-source.kamelet.yaml` file. This source will emit events every second with an autoincrement counter that will be forced to fail when the number 0 is caught. With this trick, we will simulate possible event faults. @@ -27,11 +27,51 @@ log-sink Ready incremental-id-source Ready ``` ## Error handler Kamelet -We finally install an error handler as specified in `error-handler.kamelet.yaml` file. This is a simple logger, but you can use any endpoint to collect and store the failing events. +We finally install an error handler as specified in `error-handler.kamelet.yaml` file. Let's have a look at how it is configured: + +``` +apiVersion: camel.apache.org/v1alpha1 +kind: Kamelet +metadata: + name: error-handler +spec: + definition: + ... + properties: + kafka-brokers: + ... + kafka-topic: + ... + kafka-service-account-id: + ... + kafka-service-account-secret: + ... + log-message: + ... + flow: + from: + uri: kamelet:source + steps: + # First step: send to the DLC for future processing + - to: + uri: kafka:{{kafka-topic}} + parameters: + brokers: "{{kafka-brokers}}" + security-protocol: SASL_SSL + sasl-mechanism: PLAIN + sasl-jaas-config: "org.apache.kafka.common.security.plain.PlainLoginModule required username={{kafka-service-account-id}} password={{kafka-service-account-secret}};" + # Log an error message to notify about the failure + - set-body: + constant: "{{log-message}} - worry not, the event is stored in the DLC" + - to: "log:error-sink" +``` + +We first send the errored event to a kafka topic, and then, we send a simple notification message to output, just to let the user know that some issue happened. Let's apply it: + ``` $ kubectl apply -f error-handler.kamelet.yaml ``` -You can check the newly created `kamelet` checking the list of kamelets available: +You can check the newly created `kamelet` listing the kamelets available: ``` $ kubectl get kamelets @@ -41,7 +81,7 @@ log-sink Ready incremental-id-source Ready ``` ## Error Handler Kamelet Binding -We can create a `KameletBinding` which is started by the _incremental-id-source_ `Kamelet` and log events to _log-sink_ `Kamelet`. As this will sporadically fail, we can configure an _errorHandler_ with the _error-handler_ `Kamelet` as **Dead Letter Channel**. We want to configure also some redelivery policies. We can declare it as in `kamelet-binding-error-handler.yaml` file: +We can now create a `KameletBinding` which is started by the _incremental-id-source_ `Kamelet` and log events to _log-sink_ `Kamelet`. As this will sporadically fail, we can configure an _errorHandler_ with the _error-handler_ `Kamelet` as **Dead Letter Channel**. We want to configure also some redelivery policies (1 retry, with a 2000 milliseconds delay). We can declare it as in `kamelet-binding-error-handler.yaml` file: ``` ... errorHandler: @@ -54,7 +94,7 @@ We can create a `KameletBinding` which is started by the _incremental-id-source_ properties: message: "ERROR!" parameters: - maximumRedeliveries: 3 + maximumRedeliveries: 1 redeliveryDelay: 2000 ``` Execute the following command to start the `Integration`: @@ -63,9 +103,25 @@ kubectl apply -f kamelet-binding-error-handler.yaml ``` As soon as the `Integration` starts, it will log the events on the `ok` log channel and errors on the `error` log channel: ``` -[1] 2021-04-21 13:03:43,773 INFO [sink] (Camel (camel-1) thread #0 - timer://tick) Exchange[ExchangePattern: InOnly, BodyType: String, Body: Producing message #8] -[1] 2021-04-21 13:03:44,774 INFO [sink] (Camel (camel-1) thread #0 - timer://tick) Exchange[ExchangePattern: InOnly, BodyType: String, Body: Producing message #9] -[1] 2021-04-21 13:03:45,898 INFO [error-sink] (Camel (camel-1) thread #0 - timer://tick) Exchange[ExchangePattern: InOnly, BodyType: String, Body: ERROR!] -[1] 2021-04-21 13:09:46,775 INFO [sink] (Camel (camel-1) thread #0 - timer://tick) Exchange[ExchangePattern: InOnly, BodyType: String, Body: Producing message #11] +[1] 2021-04-29 08:35:08,875 INFO [sink] (Camel (camel-1) thread #0 - timer://tick) Exchange[ExchangePattern: InOnly, BodyType: String, Body: Producing message #49] +[1] 2021-04-29 08:35:11,878 INFO [sink] (Camel (camel-1) thread #0 - timer://tick) Exchange[ExchangePattern: InOnly, BodyType: String, Body: Producing message #51] +[1] 2021-04-29 08:35:12,088 INFO [error-sink] (Camel (camel-1) thread #9 - KafkaProducer[my-first-test]) Exchange[ExchangePattern: InOnly, BodyType: String, Body: ERROR! - worry not, the event is stored in the DLC] +[1] 2021-04-29 08:35:12,877 INFO [sink] (Camel (camel-1) thread #0 - timer://tick) Exchange[ExchangePattern: InOnly, BodyType: String, Body: Producing message #52] ``` -This example is useful to guide you through the configuration of an error handler. In a production environment you will likely configure the error handler `Kamelet` pointing to a persistent queue. \ No newline at end of file + +### Recover the errors from DLC + +If you're curious to know what was going on in the DLC side, you can use the example you found in [kafka sasl ssl consumer](../kafka/sasl_ssl/): + +´´´ +kamel run --secret kafka-props SaslSSLKafkaConsumer.java --dev +... +[1] 2021-04-29 08:57:08,636 INFO [org.apa.kaf.com.uti.AppInfoParser] (Camel (camel-1) thread #0 - KafkaConsumer[my-first-test]) Kafka commitId: 448719dc99a19793 +[1] 2021-04-29 08:57:08,636 INFO [org.apa.kaf.com.uti.AppInfoParser] (Camel (camel-1) thread #0 - KafkaConsumer[my-first-test]) Kafka startTimeMs: 1619686628635 +[1] 2021-04-29 08:57:08,637 INFO [org.apa.cam.com.kaf.KafkaConsumer] (Camel (camel-1) thread #0 - KafkaConsumer[my-first-test]) Subscribing my-first-test-Thread 0 to topic my-first-test +... +[1] 2021-04-29 08:35:02,894 INFO [FromKafka] (Camel (camel-1) thread #0 - KafkaConsumer[my-first-test]) Producing message #40 +[1] 2021-04-29 08:35:12,995 INFO [FromKafka] (Camel (camel-1) thread #0 - KafkaConsumer[my-first-test]) Producing message #50 +[1] 2021-04-29 08:35:22,879 INFO [FromKafka] (Camel (camel-1) thread #0 - KafkaConsumer[my-first-test]) Producing message #60 +... +´´´ \ No newline at end of file