This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-kamelets.git


The following commit(s) were added to refs/heads/main by this push:
     new 7341c4d  Adding indexId to ElasticSearch Sink kamelet. Auto-generated 
if coming from kafka and nothing else defined
7341c4d is described below

commit 7341c4d786660b0490db269566367b308d26e3b9
Author: Maria Arias de Reyna <ariasdere...@redhat.com>
AuthorDate: Tue May 11 12:20:55 2021 +0200

    Adding indexId to ElasticSearch Sink kamelet. Auto-generated if coming
    from kafka and nothing else defined
---
 .../ROOT/pages/elasticsearch-index-sink.adoc       | 22 ++++++++++-----
 elasticsearch-index-sink.kamelet.yaml              | 33 ++++++++++++++++++++--
 2 files changed, 45 insertions(+), 10 deletions(-)

diff --git a/docs/modules/ROOT/pages/elasticsearch-index-sink.adoc 
b/docs/modules/ROOT/pages/elasticsearch-index-sink.adoc
index bd5b264..200c717 100644
--- a/docs/modules/ROOT/pages/elasticsearch-index-sink.adoc
+++ b/docs/modules/ROOT/pages/elasticsearch-index-sink.adoc
@@ -3,7 +3,14 @@
 
 *Provided by: "Apache Software Foundation"*
 
-Insert data into ElasticSearch. Input data must have JSON format.
+This sink stores documents into ElasticSearch.
+
+Input data must have JSON format according to the index used.
+
+If the *indexId* parameter is set, that value will be used as the document ID 
on ElasticSearch.
+
+If the *indexId* parameter is not set and the source of the kamelet binding is 
a Kafka broker, it will take the kafka topic, partition and offset of the
+element to generate an automatic ID that warrantees that this element is 
processed only once.
 
 == Configuration Options
 
@@ -11,10 +18,11 @@ The following table summarizes the configuration options 
available for the `elas
 [width="100%",cols="2,^2,3,^2,^2,^3",options="header"]
 |===
 | Property| Name| Description| Type| Default| Example
-| *clusterName {empty}* *| ElasticSearch Cluster Name| Name of the cluster.| 
string| | 
-| *hostAddresses {empty}* *| Host Addresses| Comma separated list with ip:port 
formatted remote transport addresses to use.| string| | 
-| *indexName {empty}* *| Index in ElasticSearch| The name of the index to act 
against.| string| | 
+| *clusterName {empty}* *| ElasticSearch Cluster Name| Name of the cluster.| 
string| | `"quickstart"`
+| *hostAddresses {empty}* *| Host Addresses| Comma separated list with ip:port 
formatted remote transport addresses to use.| string| | 
`"quickstart-es-http:9200"`
+| *indexName {empty}* *| Index in ElasticSearch| The name of the index to act 
against.| string| | `"data"`
 | enableSSL| Enable SSL| Do we want to connect using SSL?| boolean| `true`| 
+| indexId| Index ID| None| string| `"NONE"`| 
 | password| Password| Password to connect to ElasticSearch.| string| | 
 | user| Username| Username to connect to ElasticSearch.| string| | 
 |===
@@ -48,9 +56,9 @@ spec:
       apiVersion: camel.apache.org/v1alpha1
       name: elasticsearch-index-sink
     properties:
-      clusterName: "The ElasticSearch Cluster Name"
-      hostAddresses: "The Host Addresses"
-      indexName: "The Index in ElasticSearch"
+      clusterName: "quickstart"
+      hostAddresses: "quickstart-es-http:9200"
+      indexName: "data"
 
 ----
 
diff --git a/elasticsearch-index-sink.kamelet.yaml 
b/elasticsearch-index-sink.kamelet.yaml
index 56d4dff..41ad431 100755
--- a/elasticsearch-index-sink.kamelet.yaml
+++ b/elasticsearch-index-sink.kamelet.yaml
@@ -21,7 +21,14 @@ spec:
   definition:
     title: "ElasticSearch Index Sink"
     description: |-
-      Insert data into ElasticSearch. Input data must have JSON format.
+      This sink stores documents into ElasticSearch.
+
+      Input data must have JSON format according to the index used.
+
+      If the *indexId* parameter is set, that value will be used as the 
document ID on ElasticSearch.
+
+      If the *indexId* parameter is not set and the source of the kamelet 
binding is a Kafka broker, it will take the kafka topic, partition and offset 
of the
+      element to generate an automatic ID that warrantees that this element is 
processed only once.
     required:
       - clusterName
       - indexName
@@ -42,19 +49,27 @@ spec:
         type: boolean
         default: true
         x-descriptors:
-        - 'urn:alm:descriptor:com.tectonic.ui:checkbox'
+          - 'urn:alm:descriptor:com.tectonic.ui:checkbox'
       hostAddresses:
         title: Host Addresses
         description: Comma separated list with ip:port formatted remote 
transport addresses to use.
         type: string
+        example: quickstart-es-http:9200
       indexName:
         title: Index in ElasticSearch
         description: The name of the index to act against.
         type: string
+        example: data
       clusterName:
         title: ElasticSearch Cluster Name
         description: Name of the cluster.
         type: string
+        example: quickstart
+      indexId:
+        title: Index ID
+        description: None
+        type: string
+        default: "NONE"
   types:
     out:
       mediaType: application/json
@@ -70,6 +85,18 @@ spec:
     from:
       uri: kamelet:source
       steps:
+        - choice:
+            when:
+              - simple: "'{{indexId}}' == 'NONE' && ${header[kafka.TOPIC]} != 
null"
+                steps:
+                  - set-header:
+                      name: "indexId"
+                      simple: 
"${header[kafka.TOPIC]}${header[kafka.PARTITION]}${header[kafka.OFFSET]}"
+              - simple: "'{{indexId}}' != 'NONE'"
+                steps:
+                  - set-header:
+                      name: "indexId"
+                      simple: "{{indexId}}"
         - to:
             uri: "kamelet-reify:elasticsearch-rest:{{clusterName}}"
             parameters:
@@ -80,4 +107,4 @@ spec:
               user: "{{user}}"
               password: "{{password}}"
         - marshal:
-            json: {}
+            json: { }

Reply via email to