This is an automated email from the ASF dual-hosted git repository.

astefanutti pushed a commit to branch release-1.4.x
in repository https://gitbox.apache.org/repos/asf/camel-k.git

commit b0bfd89b9f22bfd63204ae3fefbbd10da950770d
Author: Pasquale Congiusti <pasquale.congiu...@gmail.com>
AuthorDate: Thu Apr 29 12:22:47 2021 +0200

    refactor(example): error handler using a real Kafka topic
---
 .../error-handler/error-handler.kamelet.yaml       | 45 +++++++++++--
 .../kamelet-binding-error-handler.yaml             | 19 ++----
 examples/kamelets/error-handler/readme.md          | 76 +++++++++++++++++++---
 3 files changed, 112 insertions(+), 28 deletions(-)

diff --git a/examples/kamelets/error-handler/error-handler.kamelet.yaml 
b/examples/kamelets/error-handler/error-handler.kamelet.yaml
index a5aa060..04ce557 100644
--- a/examples/kamelets/error-handler/error-handler.kamelet.yaml
+++ b/examples/kamelets/error-handler/error-handler.kamelet.yaml
@@ -21,20 +21,53 @@ metadata:
   name: error-handler
 spec:
   definition:
-    title: "Error Log Sink"
-    description: "Error logger channel"
+    title: "DLC and Log"
+    description: "Push an event to a kafka topic and log an error message"
     required:
-      - message
+      - kafka-brokers
+      - kafka-topic
+      - kafka-service-account-id
+      - kafka-service-account-secret
+      - log-message
     properties:
-      message:
+      kafka-brokers:
+        title: Kafka Brokers
+        description: the bootstrap server
+        type: string
+        example: 
"xyz-ins--rplssqfz-yyyyyyy-crojg.bf2.kafka.my-clud-service.com:443"    
+      kafka-topic:
+        title: Kafka Topic
+        description: the DLC topic
+        type: string
+        example: "my-dlc"    
+      kafka-service-account-id:
+        title: Service Account ID
+        description: the SA to use
+        type: string
+        example: "srvc-acct-xxxxxx-519b-453f-9f68-yyyyyyyyy"    
+      kafka-service-account-secret:
+        title: Service Account Secret
+        description: the SA secrete to use
+        type: string
+        example: "xxxxxxxxx-46c7-4c6c-a753-yyyyyyyyyyyyyyy"                
+      log-message:
         title: Message
-        description: The message to log
+        description: A message warning to log
         type: string
         example: "error while checking the source"    
   flow:
     from:
       uri: kamelet:source
       steps:
+      # First step: send to the DLC for future processing
+      - to:
+          uri: kafka:{{kafka-topic}}
+          parameters:
+            brokers: "{{kafka-brokers}}"
+            security-protocol: SASL_SSL
+            sasl-mechanism: PLAIN
+            sasl-jaas-config: 
"org.apache.kafka.common.security.plain.PlainLoginModule required 
username={{kafka-service-account-id}} 
password={{kafka-service-account-secret}};"
+      # Log an error message to notify about the failure
       - set-body:
-          constant: "{{message}}"
+          constant: "{{log-message}} - worry not, the event is stored in the 
DLC"
       - to: "log:error-sink"  
diff --git a/examples/kamelets/error-handler/kamelet-binding-error-handler.yaml 
b/examples/kamelets/error-handler/kamelet-binding-error-handler.yaml
index 7a935d6..3d3c3a5 100644
--- a/examples/kamelets/error-handler/kamelet-binding-error-handler.yaml
+++ b/examples/kamelets/error-handler/kamelet-binding-error-handler.yaml
@@ -30,16 +30,7 @@ spec:
       kind: Kamelet
       apiVersion: camel.apache.org/v1alpha1
       name: log-sink
-  errorHandler:   
-#    log:
-#      parameters:
-#        maximumRedeliveries: 3
-#        redeliveryDelay: 2000      
-#    ref: "something"
-#    bean:
-#      type: "org.apache.camel.builder.DeadLetterChannelBuilder"
-#      properties:
-#        deadLetterUri: log:error
+  errorHandler:
     dead-letter-channel:
       endpoint:
         ref:
@@ -47,8 +38,12 @@ spec:
           apiVersion: camel.apache.org/v1alpha1
           name: error-handler
         properties:
-          message: "ERROR!"
+          log-message: "ERROR!"
+          kafka-brokers: ***
+          kafka-topic: my-first-test
+          kafka-service-account-id: ***
+          kafka-service-account-secret: ***
       parameters:
-        maximumRedeliveries: 3
+        maximumRedeliveries: 1
         redeliveryDelay: 2000    
     
diff --git a/examples/kamelets/error-handler/readme.md 
b/examples/kamelets/error-handler/readme.md
index 42293cd..abf8ad6 100644
--- a/examples/kamelets/error-handler/readme.md
+++ b/examples/kamelets/error-handler/readme.md
@@ -1,5 +1,5 @@
 # Kamelets Binding Error Handler example
-This example shows how to create a simple _source_ `Kamelet` which sends 
periodically events (and certain failures). The events are consumed by a log 
_sink_ in a `KameletBinding`. With the support of the `ErrorHandler` we will be 
able to redirect all errors to a `Dead Letter Channel` _error-handler_ 
`Kamelet`.
+This example shows how to create a simple _source_ `Kamelet` which sends 
periodically events (and certain failures). The events are consumed by a log 
_sink_ in a `KameletBinding`. With the support of the `ErrorHandler` we will be 
able to redirect all errors to a `Dead Letter Channel` _error-handler_ 
`Kamelet` whose goal is to store the events in a `Kafka` topic and provide a 
nice log notifying us about the error happened.
 
 ## Incremental ID Source Kamelet
 First of all, you must install the _incremental-id-source_ Kamelet defined in 
`incremental-id-source.kamelet.yaml` file. This source will emit events every 
second with an autoincrement counter that will be forced to fail when the 
number 0 is caught. With this trick, we will simulate possible event faults.
@@ -27,11 +27,51 @@ log-sink                Ready
 incremental-id-source   Ready
 ```
 ## Error handler Kamelet
-We finally install an error handler as specified in 
`error-handler.kamelet.yaml` file. This is a simple logger, but you can use any 
endpoint to collect and store the failing events.
+We finally install an error handler as specified in 
`error-handler.kamelet.yaml` file. Let's have a look at how it is configured:
+
+```
+apiVersion: camel.apache.org/v1alpha1
+kind: Kamelet
+metadata:
+  name: error-handler
+spec:
+  definition:
+    ...
+    properties:
+      kafka-brokers:
+        ...  
+      kafka-topic:
+        ...
+      kafka-service-account-id:
+        ...  
+      kafka-service-account-secret:
+        ...              
+      log-message:
+        ...    
+  flow:
+    from:
+      uri: kamelet:source
+      steps:
+      # First step: send to the DLC for future processing
+      - to:
+          uri: kafka:{{kafka-topic}}
+          parameters:
+            brokers: "{{kafka-brokers}}"
+            security-protocol: SASL_SSL
+            sasl-mechanism: PLAIN
+            sasl-jaas-config: 
"org.apache.kafka.common.security.plain.PlainLoginModule required 
username={{kafka-service-account-id}} 
password={{kafka-service-account-secret}};"
+      # Log an error message to notify about the failure
+      - set-body:
+          constant: "{{log-message}} - worry not, the event is stored in the 
DLC"
+      - to: "log:error-sink"
+```
+
+We first send the errored event to a kafka topic, and then, we send a simple 
notification message to output, just to let the user know that some issue 
happened. Let's apply it:
+
 ```
 $ kubectl apply -f error-handler.kamelet.yaml
 ```
-You can check the newly created `kamelet` checking the list of kamelets 
available:
+You can check the newly created `kamelet` listing the kamelets available:
 ```
 $ kubectl get kamelets
 
@@ -41,7 +81,7 @@ log-sink                Ready
 incremental-id-source   Ready
 ```
 ## Error Handler Kamelet Binding
-We can create a `KameletBinding` which is started by the 
_incremental-id-source_ `Kamelet` and log events to _log-sink_ `Kamelet`. As 
this will sporadically fail, we can configure an _errorHandler_ with the 
_error-handler_ `Kamelet` as **Dead Letter Channel**. We want to configure also 
some redelivery policies. We can declare it as in 
`kamelet-binding-error-handler.yaml` file:
+We can now create a `KameletBinding` which is started by the 
_incremental-id-source_ `Kamelet` and log events to _log-sink_ `Kamelet`. As 
this will sporadically fail, we can configure an _errorHandler_ with the 
_error-handler_ `Kamelet` as **Dead Letter Channel**. We want to configure also 
some redelivery policies (1 retry, with a 2000 milliseconds delay). We can 
declare it as in `kamelet-binding-error-handler.yaml` file:
 ```
 ...
   errorHandler:
@@ -54,7 +94,7 @@ We can create a `KameletBinding` which is started by the 
_incremental-id-source_
         properties:
           message: "ERROR!"
       parameters:
-        maximumRedeliveries: 3
+        maximumRedeliveries: 1
         redeliveryDelay: 2000
 ```
 Execute the following command to start the `Integration`:
@@ -63,9 +103,25 @@ kubectl apply -f kamelet-binding-error-handler.yaml
 ```
 As soon as the `Integration` starts, it will log the events on the `ok` log 
channel and errors on the `error` log channel:
 ```
-[1] 2021-04-21 13:03:43,773 INFO  [sink] (Camel (camel-1) thread #0 - 
timer://tick) Exchange[ExchangePattern: InOnly, BodyType: String, Body: 
Producing message #8]
-[1] 2021-04-21 13:03:44,774 INFO  [sink] (Camel (camel-1) thread #0 - 
timer://tick) Exchange[ExchangePattern: InOnly, BodyType: String, Body: 
Producing message #9]
-[1] 2021-04-21 13:03:45,898 INFO  [error-sink] (Camel (camel-1) thread #0 - 
timer://tick) Exchange[ExchangePattern: InOnly, BodyType: String, Body: ERROR!]
-[1] 2021-04-21 13:09:46,775 INFO  [sink] (Camel (camel-1) thread #0 - 
timer://tick) Exchange[ExchangePattern: InOnly, BodyType: String, Body: 
Producing message #11]
+[1] 2021-04-29 08:35:08,875 INFO  [sink] (Camel (camel-1) thread #0 - 
timer://tick) Exchange[ExchangePattern: InOnly, BodyType: String, Body: 
Producing message #49]
+[1] 2021-04-29 08:35:11,878 INFO  [sink] (Camel (camel-1) thread #0 - 
timer://tick) Exchange[ExchangePattern: InOnly, BodyType: String, Body: 
Producing message #51]
+[1] 2021-04-29 08:35:12,088 INFO  [error-sink] (Camel (camel-1) thread #9 - 
KafkaProducer[my-first-test]) Exchange[ExchangePattern: InOnly, BodyType: 
String, Body: ERROR! - worry not, the event is stored in the DLC]
+[1] 2021-04-29 08:35:12,877 INFO  [sink] (Camel (camel-1) thread #0 - 
timer://tick) Exchange[ExchangePattern: InOnly, BodyType: String, Body: 
Producing message #52]
 ```
-This example is useful to guide you through the configuration of an error 
handler. In a production environment you will likely configure the error 
handler `Kamelet` pointing to a persistent queue.
\ No newline at end of file
+
+### Recover the errors from DLC
+
+If you're curious to know what was going on in the DLC side, you can use the 
example you found in [kafka sasl ssl consumer](../kafka/sasl_ssl/):
+
+´´´
+kamel run --secret kafka-props SaslSSLKafkaConsumer.java --dev
+...
+[1] 2021-04-29 08:57:08,636 INFO  [org.apa.kaf.com.uti.AppInfoParser] (Camel 
(camel-1) thread #0 - KafkaConsumer[my-first-test]) Kafka commitId: 
448719dc99a19793
+[1] 2021-04-29 08:57:08,636 INFO  [org.apa.kaf.com.uti.AppInfoParser] (Camel 
(camel-1) thread #0 - KafkaConsumer[my-first-test]) Kafka startTimeMs: 
1619686628635
+[1] 2021-04-29 08:57:08,637 INFO  [org.apa.cam.com.kaf.KafkaConsumer] (Camel 
(camel-1) thread #0 - KafkaConsumer[my-first-test]) Subscribing 
my-first-test-Thread 0 to topic my-first-test
+...
+[1] 2021-04-29 08:35:02,894 INFO  [FromKafka] (Camel (camel-1) thread #0 - 
KafkaConsumer[my-first-test]) Producing message #40
+[1] 2021-04-29 08:35:12,995 INFO  [FromKafka] (Camel (camel-1) thread #0 - 
KafkaConsumer[my-first-test]) Producing message #50
+[1] 2021-04-29 08:35:22,879 INFO  [FromKafka] (Camel (camel-1) thread #0 - 
KafkaConsumer[my-first-test]) Producing message #60
+...
+´´´
\ No newline at end of file

Reply via email to