This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new b3fd13c755e CAMEL-20403: Support Knative Broker reference in Pipe YAML (#13078) b3fd13c755e is described below commit b3fd13c755efd97cf7a316c28f9774ac785c6274 Author: Christoph Deppisch <cdeppi...@redhat.com> AuthorDate: Sat Feb 10 10:07:26 2024 +0100 CAMEL-20403: Support Knative Broker reference in Pipe YAML (#13078) - Pipes may reference a Knative broker as a source/sink - Properly configure the Knative component endpoint URI on the resulting route definition - Make sure to always use Locale.ENGLISH when performing schema validation in YAML DSL unit tests (avoids assertion errors due to internationalized error messages when tests are run on a machine with different default Locale set e.g. GERMAN) --- .../camel/dsl/yaml/YamlRoutesBuilderLoader.java | 31 +++++- .../apache/camel/dsl/yaml/PipeLoaderTest.groovy | 123 ++++++++++++++++++++- .../camel/dsl/yaml/support/YamlTestSupport.groovy | 14 ++- 3 files changed, 158 insertions(+), 10 deletions(-) diff --git a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/main/java/org/apache/camel/dsl/yaml/YamlRoutesBuilderLoader.java b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/main/java/org/apache/camel/dsl/yaml/YamlRoutesBuilderLoader.java index 2a9c7ffd224..9b95c5fb9f7 100644 --- a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/main/java/org/apache/camel/dsl/yaml/YamlRoutesBuilderLoader.java +++ b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/main/java/org/apache/camel/dsl/yaml/YamlRoutesBuilderLoader.java @@ -105,7 +105,9 @@ public class YamlRoutesBuilderLoader extends YamlRoutesBuilderLoaderSupport { private static final String BINDING_VERSION = "camel.apache.org/v1alpha1"; private static final String PIPE_VERSION = "camel.apache.org/v1"; private static final String STRIMZI_VERSION = "kafka.strimzi.io/v1"; - private static final String KNATIVE_VERSION = "messaging.knative.dev/v1"; + private static final String KNATIVE_MESSAGING_VERSION = "messaging.knative.dev/v1"; + private static final String KNATIVE_EVENTING_VERSION = "eventing.knative.dev/v1"; + private static final String KNATIVE_EVENT_TYPE = "org.apache.camel.event"; private final Map<String, Boolean> preparseDone = new ConcurrentHashMap<>(); @@ -875,11 +877,17 @@ public class YamlRoutesBuilderLoader extends YamlRoutesBuilderLoaderSupport { boolean strimzi = !kamelet && mn != null && anyTupleMatches(mn.getValue(), "apiVersion", v -> v.startsWith(STRIMZI_VERSION)) && anyTupleMatches(mn.getValue(), "kind", "KafkaTopic"); - boolean knative + boolean knativeBroker + = !kamelet && mn != null + && anyTupleMatches(mn.getValue(), "apiVersion", v -> v.startsWith(KNATIVE_EVENTING_VERSION)) + && anyTupleMatches(mn.getValue(), "kind", "Broker"); + boolean knativeChannel = !kamelet && !strimzi && mn != null - && anyTupleMatches(mn.getValue(), "apiVersion", v -> v.startsWith(KNATIVE_VERSION)); + && anyTupleMatches(mn.getValue(), "apiVersion", v -> v.startsWith(KNATIVE_MESSAGING_VERSION)); String uri; - if (kamelet || strimzi || knative) { + if (knativeBroker) { + uri = KNATIVE_EVENT_TYPE; + } else if (kamelet || strimzi || knativeChannel) { uri = extractTupleValue(mn.getValue(), "name"); } else { uri = extractTupleValue(node.getValue(), "uri"); @@ -888,6 +896,12 @@ public class YamlRoutesBuilderLoader extends YamlRoutesBuilderLoaderSupport { // properties MappingNode prop = asMappingNode(nodeAt(node, "/properties")); Map<String, Object> params = asMap(prop); + + if (knativeBroker && params != null && params.containsKey("type")) { + // Use explicit event type from properties - remove setting from params and set as uri + uri = params.remove("type").toString(); + } + if (params != null && !params.isEmpty()) { String query = URISupport.createQueryString(params); uri = uri + "?" + query; @@ -897,7 +911,14 @@ public class YamlRoutesBuilderLoader extends YamlRoutesBuilderLoaderSupport { return "kamelet:" + uri; } else if (strimzi) { return "kafka:" + uri; - } else if (knative) { + } else if (knativeBroker) { + if (uri.contains("?")) { + uri += "&kind=Broker&name=" + extractTupleValue(mn.getValue(), "name"); + } else { + uri += "?kind=Broker&name=" + extractTupleValue(mn.getValue(), "name"); + } + return "knative:event/" + uri; + } else if (knativeChannel) { return "knative:channel/" + uri; } else { return uri; diff --git a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/test/groovy/org/apache/camel/dsl/yaml/PipeLoaderTest.groovy b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/test/groovy/org/apache/camel/dsl/yaml/PipeLoaderTest.groovy index da3cbd39252..e917ed7c719 100644 --- a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/test/groovy/org/apache/camel/dsl/yaml/PipeLoaderTest.groovy +++ b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/test/groovy/org/apache/camel/dsl/yaml/PipeLoaderTest.groovy @@ -516,7 +516,7 @@ class PipeLoaderTest extends YamlTestSupport { } } - def "Pipe from kamelet to knative"() { + def "Pipe from kamelet to knative channel"() { when: // stub knative for testing as it requires to setup connection to a real knative broker @@ -555,6 +555,127 @@ class PipeLoaderTest extends YamlTestSupport { } } + def "Pipe from knative channel to kamelet"() { + when: + + // stub knative for testing as it requires to setup connection to a real knative broker + context.removeComponent("knative") + context.addComponent("knative", context.getComponent("stub")) + + loadBindings(''' + apiVersion: camel.apache.org/v1 + kind: Pipe + metadata: + name: knative-event-source + spec: + source: + ref: + kind: InMemoryChannel + apiVersion: messaging.knative.dev/v1 + name: my-messages + sink: + ref: + kind: Kamelet + apiVersion: camel.apache.org/v1 + name: log-sink + properties: + showHeaders: true + ''') + then: + context.routeDefinitions.size() == 2 + + with (context.routeDefinitions[0]) { + routeId == 'knative-event-source' + input.endpointUri == 'knative:channel/my-messages' + outputs.size() == 1 + with (outputs[0], ToDefinition) { + endpointUri == 'kamelet:log-sink?showHeaders=true' + } + } + } + + def "Pipe from kamelet to knative broker"() { + when: + + // stub knative for testing as it requires to setup connection to a real knative broker + context.removeComponent("knative") + context.addComponent("knative", context.getComponent("stub")) + + loadBindings(''' + apiVersion: camel.apache.org/v1 + kind: Pipe + metadata: + name: timer-event-source + spec: + source: + ref: + kind: Kamelet + apiVersion: camel.apache.org/v1 + name: timer-source + properties: + message: "Hello world!" + sink: + ref: + kind: Broker + apiVersion: eventing.knative.dev/v1 + name: foo-broker + properties: + type: org.apache.camel.event.messages + ''') + then: + context.routeDefinitions.size() == 2 + + with (context.routeDefinitions[0]) { + routeId == 'timer-event-source' + input.endpointUri == 'kamelet:timer-source?message=Hello+world%21' + outputs.size() == 1 + with (outputs[0], ToDefinition) { + endpointUri == 'knative:event/org.apache.camel.event.messages?kind=Broker&name=foo-broker' + } + } + } + + def "Pipe from knative broker to kamelet"() { + when: + + // stub knative for testing as it requires to setup connection to a real knative broker + context.removeComponent("knative") + context.addComponent("knative", context.getComponent("stub")) + + loadBindings(''' + apiVersion: camel.apache.org/v1 + kind: Pipe + metadata: + name: knative-event-source + spec: + source: + ref: + kind: Broker + apiVersion: eventing.knative.dev/v1 + name: foo-broker + properties: + type: org.apache.camel.event.messages + sink: + ref: + kind: Kamelet + apiVersion: camel.apache.org/v1 + name: log-sink + properties: + showHeaders: true + ''') + then: + context.routeDefinitions.size() == 2 + + with (context.routeDefinitions[0]) { + routeId == 'knative-event-source' + input.endpointUri == 'knative:event/org.apache.camel.event.messages?kind=Broker&name=foo-broker' + outputs.size() == 1 + with (outputs[0], ToDefinition) { + endpointUri == 'kamelet:log-sink?showHeaders=true' + } + } + } + def "kamelet start route"() { when: loadBindings(''' diff --git a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/test/groovy/org/apache/camel/dsl/yaml/support/YamlTestSupport.groovy b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/test/groovy/org/apache/camel/dsl/yaml/support/YamlTestSupport.groovy index c98f84813f4..0114841fe4d 100644 --- a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/test/groovy/org/apache/camel/dsl/yaml/support/YamlTestSupport.groovy +++ b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/test/groovy/org/apache/camel/dsl/yaml/support/YamlTestSupport.groovy @@ -18,7 +18,8 @@ package org.apache.camel.dsl.yaml.support import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.dataformat.yaml.YAMLFactory -import com.networknt.schema.JsonSchemaFactory; +import com.networknt.schema.JsonSchemaFactory +import com.networknt.schema.SchemaValidatorsConfig import com.networknt.schema.SpecVersionDetector import groovy.util.logging.Slf4j import org.apache.camel.CamelContext @@ -40,9 +41,14 @@ import java.nio.charset.StandardCharsets @Slf4j class YamlTestSupport extends Specification implements HasCamelContext { static def MAPPER = new ObjectMapper(new YAMLFactory()) - static def SCHEMA_NODE = MAPPER.readTree(ResourceHelper.getResourceAsStream('/schema/camelYamlDsl.json')); - static def FACTORY = JsonSchemaFactory.getInstance(SpecVersionDetector.detect(SCHEMA_NODE)); - static def SCHEMA = FACTORY.getSchema(SCHEMA_NODE); + static def SCHEMA_NODE = MAPPER.readTree(ResourceHelper.getResourceAsStream('/schema/camelYamlDsl.json')) + static def FACTORY = JsonSchemaFactory.getInstance(SpecVersionDetector.detect(SCHEMA_NODE)) + static def SCHEMA_VALIDATORS_CONFIG = { + SchemaValidatorsConfig config = new SchemaValidatorsConfig() + config.setLocale(Locale.ENGLISH) + return config + }() + static def SCHEMA = FACTORY.getSchema(SCHEMA_NODE, SCHEMA_VALIDATORS_CONFIG) @AutoCleanup def context = new DefaultCamelContext()