This is an automated email from the ASF dual-hosted git repository. nferraro pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-k-runtime.git
The following commit(s) were added to refs/heads/master by this push: new 1cd9bf7 Provide support for statically defined cloudevent headers #175 new 0f244a6 Merge pull request #176 from lburgazzoli/github-175 1cd9bf7 is described below commit 1cd9bf7213a3992d677877bb3da4c2b8ce730547 Author: lburgazzoli <lburgazz...@gmail.com> AuthorDate: Fri Oct 25 17:28:47 2019 +0200 Provide support for statically defined cloudevent headers #175 --- .../camel/component/knative/spi/Knative.java | 1 + .../component/knative/spi/KnativeEnvironment.java | 3 +- .../component/knative/http/KnativeHttpTest.java | 186 +++++++++++++++++++++ .../camel/component/knative/KnativeComponent.java | 6 + .../component/knative/KnativeConfiguration.java | 13 ++ .../camel/component/knative/KnativeEndpoint.java | 13 ++ .../knative/ce/AbstractCloudEventProcessor.java | 9 + 7 files changed, 230 insertions(+), 1 deletion(-) diff --git a/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/Knative.java b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/Knative.java index 65c2393..4d52932 100644 --- a/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/Knative.java +++ b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/Knative.java @@ -24,6 +24,7 @@ public final class Knative { public static final String KNATIVE_TRANSPORT_RESOURCE_PATH = "META-INF/services/org/apache/camel/knative/transport/"; public static final String KNATIVE_FILTER_PREFIX = "filter."; + public static final String KNATIVE_CE_OVERRIDE_PREFIX = "ce.override."; public static final String KNATIVE_TYPE = "knative.type"; public static final String KNATIVE_EVENT_TYPE = "knative.event.type"; public static final String KNATIVE_KIND = "knative.kind"; diff --git a/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeEnvironment.java b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeEnvironment.java index f74fb5b..1702ae2 100644 --- a/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeEnvironment.java +++ b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeEnvironment.java @@ -80,7 +80,7 @@ public class KnativeEnvironment { // { // "services": [ // { - // "type": "channel|endpoint", + // "type": "channel|endpoint|event", // "name": "", // "host": "", // "port": "", @@ -91,6 +91,7 @@ public class KnativeEnvironment { // "knative.kind": "", // "knative.apiVersion": "", // "camel.endpoint.kind": "source|sink", + // "ce.override.ce-type": "something", // } // }, // ] diff --git a/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java index e9f13df..9628838 100644 --- a/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java +++ b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Objects; import java.util.Random; +import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -1248,5 +1249,190 @@ public class KnativeHttpTest { server.stop(); } } + + @ParameterizedTest + @MethodSource("provideCloudEventsImplementations") + void testHeadersOverrideFromEnv(CloudEvent ce) throws Exception { + final String typeHeaderKey = ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(); + final String typeHeaderVal = UUID.randomUUID().toString(); + final String sourceHeaderKey = ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(); + final String sourceHeaderVal = UUID.randomUUID().toString(); + + configureKnativeComponent( + context, + ce, + endpoint( + Knative.EndpointKind.sink, + "ep", + "localhost", + port, + KnativeSupport.mapOf( + Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", + Knative.CONTENT_TYPE, "text/plain", + Knative.KNATIVE_CE_OVERRIDE_PREFIX + typeHeaderKey, typeHeaderVal, + Knative.KNATIVE_CE_OVERRIDE_PREFIX + sourceHeaderKey, sourceHeaderVal + ) + ) + ); + + CountDownLatch latch = new CountDownLatch(1); + AtomicReference<HttpServerExchange> exchange = new AtomicReference<>(); + + Undertow server = Undertow.builder() + .addHttpListener(port, "localhost") + .setHandler(se -> { + exchange.set(se); + latch.countDown(); + }) + .build(); + + RouteBuilder.addRoutes(context, b -> { + b.from("direct:start") + .to("knative:endpoint/ep"); + }); + + context.start(); + try { + server.start(); + template.sendBody("direct:start", ""); + + latch.await(); + + HeaderMap headers = exchange.get().getRequestHeaders(); + + assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http())).isEqualTo(ce.version()); + assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http())).isEqualTo(typeHeaderVal); + assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http())).isNotNull(); + assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http())).isEqualTo(sourceHeaderVal); + assertThat(headers.getFirst(Exchange.CONTENT_TYPE)).isEqualTo("text/plain"); + } finally { + server.stop(); + } + } + + @ParameterizedTest + @MethodSource("provideCloudEventsImplementations") + void testHeadersOverrideFromURI(CloudEvent ce) throws Exception { + final String typeHeaderKey = ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(); + final String typeHeaderVal = UUID.randomUUID().toString(); + final String sourceHeaderKey = ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(); + final String sourceHeaderVal = UUID.randomUUID().toString(); + + configureKnativeComponent( + context, + ce, + endpoint( + Knative.EndpointKind.sink, + "ep", + "localhost", + port, + KnativeSupport.mapOf( + Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", + Knative.CONTENT_TYPE, "text/plain" + ) + ) + ); + + CountDownLatch latch = new CountDownLatch(1); + AtomicReference<HttpServerExchange> exchange = new AtomicReference<>(); + + Undertow server = Undertow.builder() + .addHttpListener(port, "localhost") + .setHandler(se -> { + exchange.set(se); + latch.countDown(); + }) + .build(); + + RouteBuilder.addRoutes(context, b -> { + b.from("direct:start") + .toF("knative:endpoint/ep?%s=%s&%s=%s", + Knative.KNATIVE_CE_OVERRIDE_PREFIX + typeHeaderKey, typeHeaderVal, + Knative.KNATIVE_CE_OVERRIDE_PREFIX + sourceHeaderKey, sourceHeaderVal); + }); + + context.start(); + try { + server.start(); + template.sendBody("direct:start", ""); + + latch.await(); + + HeaderMap headers = exchange.get().getRequestHeaders(); + + assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http())).isEqualTo(ce.version()); + assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http())).isEqualTo(typeHeaderVal); + assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http())).isNotNull(); + assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http())).isEqualTo(sourceHeaderVal); + assertThat(headers.getFirst(Exchange.CONTENT_TYPE)).isEqualTo("text/plain"); + } finally { + server.stop(); + } + } + + + + @ParameterizedTest + @MethodSource("provideCloudEventsImplementations") + void testHeadersOverrideFromConf(CloudEvent ce) throws Exception { + final String typeHeaderKey = ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(); + final String typeHeaderVal = UUID.randomUUID().toString(); + final String sourceHeaderKey = ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(); + final String sourceHeaderVal = UUID.randomUUID().toString(); + + KnativeComponent component = configureKnativeComponent( + context, + ce, + endpoint( + Knative.EndpointKind.sink, + "ep", + "localhost", + port, + KnativeSupport.mapOf( + Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", + Knative.CONTENT_TYPE, "text/plain" + ) + ) + ); + + component.getConfiguration().setCeOverride(KnativeSupport.mapOf( + Knative.KNATIVE_CE_OVERRIDE_PREFIX + typeHeaderKey, typeHeaderVal, + Knative.KNATIVE_CE_OVERRIDE_PREFIX + sourceHeaderKey, sourceHeaderVal + )); + + CountDownLatch latch = new CountDownLatch(1); + AtomicReference<HttpServerExchange> exchange = new AtomicReference<>(); + + Undertow server = Undertow.builder() + .addHttpListener(port, "localhost") + .setHandler(se -> { + exchange.set(se); + latch.countDown(); + }) + .build(); + + RouteBuilder.addRoutes(context, b -> { + b.from("direct:start") + .to("knative:endpoint/ep"); + }); + + context.start(); + try { + server.start(); + template.sendBody("direct:start", ""); + + latch.await(); + + HeaderMap headers = exchange.get().getRequestHeaders(); + + assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http())).isEqualTo(ce.version()); + assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http())).isEqualTo(typeHeaderVal); + assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http())).isNotNull(); + assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http())).isEqualTo(sourceHeaderVal); + assertThat(headers.getFirst(Exchange.CONTENT_TYPE)).isEqualTo("text/plain"); + } finally { + server.stop(); + } + } } diff --git a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeComponent.java b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeComponent.java index 3cb34c2..6458c09 100644 --- a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeComponent.java +++ b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeComponent.java @@ -226,6 +226,9 @@ public class KnativeComponent extends DefaultComponent { conf.getTransportOptions().putAll( PropertiesHelper.extractProperties(parameters, "transport.", true) ); + conf.getCeOverride().putAll( + PropertiesHelper.extractProperties(parameters, "ce.override.", true) + ); // set properties from the endpoint uri PropertyBindingSupport.bindProperties(getCamelContext(), conf, parameters); @@ -252,6 +255,9 @@ public class KnativeComponent extends DefaultComponent { if (conf.getFilters() == null) { conf.setFilters(new HashMap<>()); } + if (conf.getCeOverride() == null) { + conf.setCeOverride(new HashMap<>()); + } if (conf.getEnvironment() == null) { String envConfig = System.getenv(CONFIGURATION_ENV_VARIABLE); diff --git a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java index 277f33c..eb93fb7 100644 --- a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java +++ b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java @@ -40,6 +40,8 @@ public class KnativeConfiguration implements Cloneable { private Map<String, Object> transportOptions; @UriParam(prefix = "filter.") private Map<String, Object> filters; + @UriParam(prefix = "ce.override.") + private Map<String, Object> ceOverride; @UriParam(label = "advanced") private String apiVersion; @UriParam(label = "advanced") @@ -131,6 +133,17 @@ public class KnativeConfiguration implements Cloneable { this.filters = filters; } + public Map<String, Object> getCeOverride() { + return ceOverride; + } + + /** + * CloudEvent headers to override + */ + public void setCeOverride(Map<String, Object> ceOverride) { + this.ceOverride = ceOverride; + } + public String getApiVersion() { return apiVersion; } diff --git a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java index d06adf9..a9c75d5 100644 --- a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java +++ b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java @@ -158,6 +158,19 @@ public class KnativeEndpoint extends DefaultEndpoint { } } + for (Map.Entry<String, Object> entry: configuration.getCeOverride().entrySet()) { + String key = entry.getKey(); + Object val = entry.getValue(); + + if (val instanceof String) { + if (!key.startsWith(Knative.KNATIVE_CE_OVERRIDE_PREFIX)) { + key = Knative.KNATIVE_CE_OVERRIDE_PREFIX + key; + } + + metadata.put(key, (String)val); + } + } + if (service.get().getType() == Knative.Type.event) { metadata.put(Knative.KNATIVE_EVENT_TYPE, serviceName); metadata.put(Knative.KNATIVE_FILTER_PREFIX + cloudEvent.cloudEvent().mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), serviceName); diff --git a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/AbstractCloudEventProcessor.java b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/AbstractCloudEventProcessor.java index 6d1af26..4cd638a 100644 --- a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/AbstractCloudEventProcessor.java +++ b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/AbstractCloudEventProcessor.java @@ -91,6 +91,15 @@ abstract class AbstractCloudEventProcessor implements CloudEventProcessor { headers.putIfAbsent(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), eventType); headers.putIfAbsent(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), eventTime); headers.putIfAbsent(Exchange.CONTENT_TYPE, contentType); + + for (Map.Entry<String, String> entry: service.getMetadata().entrySet()) { + if (entry.getKey().startsWith(Knative.KNATIVE_CE_OVERRIDE_PREFIX)) { + final String key = entry.getKey().substring(Knative.KNATIVE_CE_OVERRIDE_PREFIX.length()); + final String val = entry.getValue(); + + headers.put(key, val); + } + } }; } }