This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new b3fd13c755e CAMEL-20403: Support Knative Broker reference in Pipe YAML 
(#13078)
b3fd13c755e is described below

commit b3fd13c755efd97cf7a316c28f9774ac785c6274
Author: Christoph Deppisch <cdeppi...@redhat.com>
AuthorDate: Sat Feb 10 10:07:26 2024 +0100

    CAMEL-20403: Support Knative Broker reference in Pipe YAML (#13078)
    
    - Pipes may reference a Knative broker as a source/sink
    - Properly configure the Knative component endpoint URI on the resulting 
route definition
    - Make sure to always use Locale.ENGLISH when performing schema validation 
in YAML DSL unit tests (avoids assertion errors due to internationalized error 
messages when tests are run on a machine with different default Locale set e.g. 
GERMAN)
---
 .../camel/dsl/yaml/YamlRoutesBuilderLoader.java    |  31 +++++-
 .../apache/camel/dsl/yaml/PipeLoaderTest.groovy    | 123 ++++++++++++++++++++-
 .../camel/dsl/yaml/support/YamlTestSupport.groovy  |  14 ++-
 3 files changed, 158 insertions(+), 10 deletions(-)

diff --git 
a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/main/java/org/apache/camel/dsl/yaml/YamlRoutesBuilderLoader.java
 
b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/main/java/org/apache/camel/dsl/yaml/YamlRoutesBuilderLoader.java
index 2a9c7ffd224..9b95c5fb9f7 100644
--- 
a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/main/java/org/apache/camel/dsl/yaml/YamlRoutesBuilderLoader.java
+++ 
b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/main/java/org/apache/camel/dsl/yaml/YamlRoutesBuilderLoader.java
@@ -105,7 +105,9 @@ public class YamlRoutesBuilderLoader extends 
YamlRoutesBuilderLoaderSupport {
     private static final String BINDING_VERSION = "camel.apache.org/v1alpha1";
     private static final String PIPE_VERSION = "camel.apache.org/v1";
     private static final String STRIMZI_VERSION = "kafka.strimzi.io/v1";
-    private static final String KNATIVE_VERSION = "messaging.knative.dev/v1";
+    private static final String KNATIVE_MESSAGING_VERSION = 
"messaging.knative.dev/v1";
+    private static final String KNATIVE_EVENTING_VERSION = 
"eventing.knative.dev/v1";
+    private static final String KNATIVE_EVENT_TYPE = "org.apache.camel.event";
 
     private final Map<String, Boolean> preparseDone = new 
ConcurrentHashMap<>();
 
@@ -875,11 +877,17 @@ public class YamlRoutesBuilderLoader extends 
YamlRoutesBuilderLoaderSupport {
         boolean strimzi
                 = !kamelet && mn != null && anyTupleMatches(mn.getValue(), 
"apiVersion", v -> v.startsWith(STRIMZI_VERSION))
                         && anyTupleMatches(mn.getValue(), "kind", 
"KafkaTopic");
-        boolean knative
+        boolean knativeBroker
+                = !kamelet && mn != null
+                        && anyTupleMatches(mn.getValue(), "apiVersion", v -> 
v.startsWith(KNATIVE_EVENTING_VERSION))
+                        && anyTupleMatches(mn.getValue(), "kind", "Broker");
+        boolean knativeChannel
                 = !kamelet && !strimzi && mn != null
-                        && anyTupleMatches(mn.getValue(), "apiVersion", v -> 
v.startsWith(KNATIVE_VERSION));
+                        && anyTupleMatches(mn.getValue(), "apiVersion", v -> 
v.startsWith(KNATIVE_MESSAGING_VERSION));
         String uri;
-        if (kamelet || strimzi || knative) {
+        if (knativeBroker) {
+            uri = KNATIVE_EVENT_TYPE;
+        } else if (kamelet || strimzi || knativeChannel) {
             uri = extractTupleValue(mn.getValue(), "name");
         } else {
             uri = extractTupleValue(node.getValue(), "uri");
@@ -888,6 +896,12 @@ public class YamlRoutesBuilderLoader extends 
YamlRoutesBuilderLoaderSupport {
         // properties
         MappingNode prop = asMappingNode(nodeAt(node, "/properties"));
         Map<String, Object> params = asMap(prop);
+
+        if (knativeBroker && params != null && params.containsKey("type")) {
+            // Use explicit event type from properties - remove setting from 
params and set as uri
+            uri = params.remove("type").toString();
+        }
+
         if (params != null && !params.isEmpty()) {
             String query = URISupport.createQueryString(params);
             uri = uri + "?" + query;
@@ -897,7 +911,14 @@ public class YamlRoutesBuilderLoader extends 
YamlRoutesBuilderLoaderSupport {
             return "kamelet:" + uri;
         } else if (strimzi) {
             return "kafka:" + uri;
-        } else if (knative) {
+        } else if (knativeBroker) {
+            if (uri.contains("?")) {
+                uri += "&kind=Broker&name=" + extractTupleValue(mn.getValue(), 
"name");
+            } else {
+                uri += "?kind=Broker&name=" + extractTupleValue(mn.getValue(), 
"name");
+            }
+            return "knative:event/" + uri;
+        } else if (knativeChannel) {
             return "knative:channel/" + uri;
         } else {
             return uri;
diff --git 
a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/test/groovy/org/apache/camel/dsl/yaml/PipeLoaderTest.groovy
 
b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/test/groovy/org/apache/camel/dsl/yaml/PipeLoaderTest.groovy
index da3cbd39252..e917ed7c719 100644
--- 
a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/test/groovy/org/apache/camel/dsl/yaml/PipeLoaderTest.groovy
+++ 
b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/test/groovy/org/apache/camel/dsl/yaml/PipeLoaderTest.groovy
@@ -516,7 +516,7 @@ class PipeLoaderTest extends YamlTestSupport {
         }
     }
 
-    def "Pipe from kamelet to knative"() {
+    def "Pipe from kamelet to knative channel"() {
         when:
 
         // stub knative for testing as it requires to setup connection to a 
real knative broker
@@ -555,6 +555,127 @@ class PipeLoaderTest extends YamlTestSupport {
         }
     }
 
+    def "Pipe from knative channel to kamelet"() {
+        when:
+
+        // stub knative for testing as it requires to setup connection to a 
real knative broker
+        context.removeComponent("knative")
+        context.addComponent("knative", context.getComponent("stub"))
+
+        loadBindings('''
+                apiVersion: camel.apache.org/v1
+                kind: Pipe
+                metadata:
+                  name: knative-event-source                  
+                spec:
+                  source:
+                    ref:
+                      kind: InMemoryChannel
+                      apiVersion: messaging.knative.dev/v1
+                      name: my-messages
+                  sink:
+                    ref:
+                      kind: Kamelet
+                      apiVersion: camel.apache.org/v1
+                      name: log-sink
+                    properties:
+                      showHeaders: true
+            ''')
+        then:
+        context.routeDefinitions.size() == 2
+
+        with (context.routeDefinitions[0]) {
+            routeId == 'knative-event-source'
+            input.endpointUri == 'knative:channel/my-messages'
+            outputs.size() == 1
+            with (outputs[0], ToDefinition) {
+                endpointUri == 'kamelet:log-sink?showHeaders=true'
+            }
+        }
+    }
+
+    def "Pipe from kamelet to knative broker"() {
+        when:
+
+        // stub knative for testing as it requires to setup connection to a 
real knative broker
+        context.removeComponent("knative")
+        context.addComponent("knative", context.getComponent("stub"))
+
+        loadBindings('''
+                apiVersion: camel.apache.org/v1
+                kind: Pipe
+                metadata:
+                  name: timer-event-source                  
+                spec:
+                  source:
+                    ref:
+                      kind: Kamelet
+                      apiVersion: camel.apache.org/v1
+                      name: timer-source
+                    properties:
+                      message: "Hello world!"
+                  sink:
+                    ref:
+                      kind: Broker
+                      apiVersion: eventing.knative.dev/v1
+                      name: foo-broker
+                    properties:
+                      type: org.apache.camel.event.messages  
+            ''')
+        then:
+        context.routeDefinitions.size() == 2
+
+        with (context.routeDefinitions[0]) {
+            routeId == 'timer-event-source'
+            input.endpointUri == 'kamelet:timer-source?message=Hello+world%21'
+            outputs.size() == 1
+            with (outputs[0], ToDefinition) {
+                endpointUri == 
'knative:event/org.apache.camel.event.messages?kind=Broker&name=foo-broker'
+            }
+        }
+    }
+
+    def "Pipe from knative broker to kamelet"() {
+        when:
+
+        // stub knative for testing as it requires to setup connection to a 
real knative broker
+        context.removeComponent("knative")
+        context.addComponent("knative", context.getComponent("stub"))
+
+        loadBindings('''
+                apiVersion: camel.apache.org/v1
+                kind: Pipe
+                metadata:
+                  name: knative-event-source                  
+                spec:
+                  source:
+                    ref:
+                      kind: Broker
+                      apiVersion: eventing.knative.dev/v1
+                      name: foo-broker
+                    properties:
+                      type: org.apache.camel.event.messages  
+                  sink:
+                    ref:
+                      kind: Kamelet
+                      apiVersion: camel.apache.org/v1
+                      name: log-sink
+                    properties:
+                      showHeaders: true
+            ''')
+        then:
+        context.routeDefinitions.size() == 2
+
+        with (context.routeDefinitions[0]) {
+            routeId == 'knative-event-source'
+            input.endpointUri == 
'knative:event/org.apache.camel.event.messages?kind=Broker&name=foo-broker'
+            outputs.size() == 1
+            with (outputs[0], ToDefinition) {
+                endpointUri == 'kamelet:log-sink?showHeaders=true'
+            }
+        }
+    }
+
     def "kamelet start route"() {
         when:
         loadBindings('''
diff --git 
a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/test/groovy/org/apache/camel/dsl/yaml/support/YamlTestSupport.groovy
 
b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/test/groovy/org/apache/camel/dsl/yaml/support/YamlTestSupport.groovy
index c98f84813f4..0114841fe4d 100644
--- 
a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/test/groovy/org/apache/camel/dsl/yaml/support/YamlTestSupport.groovy
+++ 
b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/test/groovy/org/apache/camel/dsl/yaml/support/YamlTestSupport.groovy
@@ -18,7 +18,8 @@ package org.apache.camel.dsl.yaml.support
 
 import com.fasterxml.jackson.databind.ObjectMapper
 import com.fasterxml.jackson.dataformat.yaml.YAMLFactory
-import com.networknt.schema.JsonSchemaFactory;
+import com.networknt.schema.JsonSchemaFactory
+import com.networknt.schema.SchemaValidatorsConfig
 import com.networknt.schema.SpecVersionDetector
 import groovy.util.logging.Slf4j
 import org.apache.camel.CamelContext
@@ -40,9 +41,14 @@ import java.nio.charset.StandardCharsets
 @Slf4j
 class YamlTestSupport extends Specification implements HasCamelContext {
     static def MAPPER = new ObjectMapper(new YAMLFactory())
-    static def SCHEMA_NODE = 
MAPPER.readTree(ResourceHelper.getResourceAsStream('/schema/camelYamlDsl.json'));
-    static def FACTORY = 
JsonSchemaFactory.getInstance(SpecVersionDetector.detect(SCHEMA_NODE));
-    static def SCHEMA = FACTORY.getSchema(SCHEMA_NODE);
+    static def SCHEMA_NODE = 
MAPPER.readTree(ResourceHelper.getResourceAsStream('/schema/camelYamlDsl.json'))
+    static def FACTORY = 
JsonSchemaFactory.getInstance(SpecVersionDetector.detect(SCHEMA_NODE))
+    static def SCHEMA_VALIDATORS_CONFIG = {
+        SchemaValidatorsConfig config = new SchemaValidatorsConfig()
+        config.setLocale(Locale.ENGLISH)
+        return config
+    }()
+    static def SCHEMA = FACTORY.getSchema(SCHEMA_NODE, 
SCHEMA_VALIDATORS_CONFIG)
 
     @AutoCleanup
     def context = new DefaultCamelContext()

Reply via email to