This is an automated email from the ASF dual-hosted git repository. nferraro pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-k-runtime.git
The following commit(s) were added to refs/heads/master by this push: new f3453ff Wrong CloudEvent Headers used #173 new 34c792b Merge pull request #174 from lburgazzoli/github-173 f3453ff is described below commit f3453ffc757f592789026cf24804f9a64c5bc74a Author: lburgazzoli <lburgazz...@gmail.com> AuthorDate: Fri Oct 25 01:37:08 2019 +0200 Wrong CloudEvent Headers used #173 --- .../camel/component/knative/spi/CloudEvent.java | 15 ++ .../camel/component/knative/spi/CloudEvents.java | 50 ++-- .../component/knative/http/KnativeHttpTest.java | 261 +++++++++++++-------- .../camel/component/knative/KnativeEndpoint.java | 3 +- .../knative/ce/AbstractCloudEventProcessor.java | 36 ++- .../component/knative/ce/CloudEventProcessors.java | 12 +- 6 files changed, 224 insertions(+), 153 deletions(-) diff --git a/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/CloudEvent.java b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/CloudEvent.java index df17494..bc92f38 100644 --- a/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/CloudEvent.java +++ b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/CloudEvent.java @@ -20,7 +20,22 @@ import java.util.Collection; import java.util.Objects; import java.util.Optional; +import org.apache.camel.Exchange; + public interface CloudEvent { + String CAMEL_CLOUD_EVENT_ID = "CamelCloudEventID"; + String CAMEL_CLOUD_EVENT_SOURCE = "CamelCloudEventSource"; + String CAMEL_CLOUD_EVENT_VERSION = "CamelCloudEventVersion"; + String CAMEL_CLOUD_EVENT_TYPE = "CamelCloudEventType"; + String CAMEL_CLOUD_EVENT_TYPE_VERSION = "CamelCloudEventTypeVersion"; + String CAMEL_CLOUD_EVENT_DATA_CONTENT_TYPE = "CamelCloudEventDataContentType"; + String CAMEL_CLOUD_EVENT_DATA_CONTENT_ENCODING = "CamelCloudEventDataContentEncoding"; + String CAMEL_CLOUD_EVENT_SCHEMA_URL = "CamelCloudEventSchemaURL"; + String CAMEL_CLOUD_EVENT_SUBJECT = "CamelCloudEventSubject"; + String CAMEL_CLOUD_EVENT_TIME = "CamelCloudEventTime"; + String CAMEL_CLOUD_EVENT_EXTENSIONS = "CamelCloudEventExtensions"; + String CAMEL_CLOUD_EVENT_CONTENT_TYPE = Exchange.CONTENT_TYPE; + /** * The CloudEvent spec version. */ diff --git a/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/CloudEvents.java b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/CloudEvents.java index 7d7e7a9..dd8ab96 100644 --- a/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/CloudEvents.java +++ b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/CloudEvents.java @@ -27,15 +27,15 @@ public enum CloudEvents implements CloudEvent { V01(new CloudEventImpl( "0.1", Arrays.asList( - Attribute.simple("type", "CE-EventType", "eventType"), - Attribute.simple("type.version", "CE-EventTypeVersion", "eventTypeVersion"), - Attribute.simple("version", "CE-CloudEventsVersion", "cloudEventsVersion"), - Attribute.simple("source", "CE-Source", "source"), - Attribute.simple("id", "CE-EventID", "eventID"), - Attribute.simple("time", "CE-EventTime", "eventTime"), - Attribute.simple("schema.url", "CE-SchemaURL", "schemaURL"), - Attribute.simple("content.type", "ContentType", "contentType"), - Attribute.simple("extensions", "CE-Extensions", "extensions") + Attribute.simple(CloudEvent.CAMEL_CLOUD_EVENT_TYPE, "CE-EventType", "eventType"), + Attribute.simple(CloudEvent.CAMEL_CLOUD_EVENT_TYPE_VERSION, "CE-EventTypeVersion", "eventTypeVersion"), + Attribute.simple(CloudEvent.CAMEL_CLOUD_EVENT_VERSION, "CE-CloudEventsVersion", "cloudEventsVersion"), + Attribute.simple(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE, "CE-Source", "source"), + Attribute.simple(CloudEvent.CAMEL_CLOUD_EVENT_ID, "CE-EventID", "eventID"), + Attribute.simple(CloudEvent.CAMEL_CLOUD_EVENT_TIME, "CE-EventTime", "eventTime"), + Attribute.simple(CloudEvent.CAMEL_CLOUD_EVENT_SCHEMA_URL, "CE-SchemaURL", "schemaURL"), + Attribute.simple(CloudEvent.CAMEL_CLOUD_EVENT_CONTENT_TYPE, "ContentType", "contentType"), + Attribute.simple(CloudEvent.CAMEL_CLOUD_EVENT_EXTENSIONS, "CE-Extensions", "extensions") ) )), // @@ -44,13 +44,13 @@ public enum CloudEvents implements CloudEvent { V02(new CloudEventImpl( "0.2", Arrays.asList( - Attribute.simple("type", "ce-type", "type"), - Attribute.simple("version", "ce-specversion", "specversion"), - Attribute.simple("source", "ce-source", "source"), - Attribute.simple("id", "ce-id", "id"), - Attribute.simple("time", "ce-time", "time"), - Attribute.simple("schema.url", "ce-schemaurl", "schemaurl"), - Attribute.simple("content.type", "Content-Type", "contenttype") + Attribute.simple(CloudEvent.CAMEL_CLOUD_EVENT_TYPE, "ce-type", "type"), + Attribute.simple(CloudEvent.CAMEL_CLOUD_EVENT_VERSION, "ce-specversion", "specversion"), + Attribute.simple(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE, "ce-source", "source"), + Attribute.simple(CloudEvent.CAMEL_CLOUD_EVENT_ID, "ce-id", "id"), + Attribute.simple(CloudEvent.CAMEL_CLOUD_EVENT_TIME, "ce-time", "time"), + Attribute.simple(CloudEvent.CAMEL_CLOUD_EVENT_SCHEMA_URL, "ce-schemaurl", "schemaurl"), + Attribute.simple(CloudEvent.CAMEL_CLOUD_EVENT_CONTENT_TYPE, "Content-Type", "contenttype") ) )), // @@ -59,15 +59,15 @@ public enum CloudEvents implements CloudEvent { V03(new CloudEventImpl( "0.3", Arrays.asList( - Attribute.simple("id", "ce-id", "id"), - Attribute.simple("source", "ce-source", "source"), - Attribute.simple("version", "ce-specversion", "specversion"), - Attribute.simple("type", "ce-type", "type"), - Attribute.simple("data.content.encoding", "ce-datacontentencoding", "datacontentencoding"), - Attribute.simple("data.content.type", "ce-datacontenttype", "datacontenttype"), - Attribute.simple("schema.url", "ce-schemaurl", "schemaurl"), - Attribute.simple("subject", "ce-subject", "subject"), - Attribute.simple("time", "ce-time", "time") + Attribute.simple(CloudEvent.CAMEL_CLOUD_EVENT_ID, "ce-id", "id"), + Attribute.simple(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE, "ce-source", "source"), + Attribute.simple(CloudEvent.CAMEL_CLOUD_EVENT_VERSION, "ce-specversion", "specversion"), + Attribute.simple(CloudEvent.CAMEL_CLOUD_EVENT_TYPE, "ce-type", "type"), + Attribute.simple(CloudEvent.CAMEL_CLOUD_EVENT_DATA_CONTENT_ENCODING, "ce-datacontentencoding", "datacontentencoding"), + Attribute.simple(CloudEvent.CAMEL_CLOUD_EVENT_DATA_CONTENT_TYPE, "ce-datacontenttype", "datacontenttype"), + Attribute.simple(CloudEvent.CAMEL_CLOUD_EVENT_SCHEMA_URL, "ce-schemaurl", "schemaurl"), + Attribute.simple(CloudEvent.CAMEL_CLOUD_EVENT_SUBJECT, "ce-subject", "subject"), + Attribute.simple(CloudEvent.CAMEL_CLOUD_EVENT_TIME, "ce-time", "time") ) )); diff --git a/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java index 6e0eb8d..e9f13df 100644 --- a/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java +++ b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java @@ -22,11 +22,15 @@ import java.util.ArrayList; import java.util.List; import java.util.Objects; import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.Stream; import com.fasterxml.jackson.databind.ObjectMapper; import io.undertow.Undertow; +import io.undertow.server.HttpServerExchange; +import io.undertow.util.HeaderMap; import org.apache.camel.CamelContext; import org.apache.camel.CamelException; import org.apache.camel.Exchange; @@ -136,12 +140,12 @@ public class KnativeHttpTest { context.start(); MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class); - mock.expectedHeaderReceived(ce.mandatoryAttribute("version").id(), ce.version()); - mock.expectedHeaderReceived(ce.mandatoryAttribute("type").id(), "org.apache.camel.event"); - mock.expectedHeaderReceived(ce.mandatoryAttribute("source").id(), "knative://endpoint/myEndpoint"); + mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version()); + mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "org.apache.camel.event"); + mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "knative://endpoint/myEndpoint"); mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain"); - mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute("time").id())); - mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute("id").id())); + mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http())); + mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http())); mock.expectedBodiesReceived("test"); mock.expectedMessageCount(1); @@ -181,12 +185,12 @@ public class KnativeHttpTest { context.start(); MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class); - mock.expectedHeaderReceived(ce.mandatoryAttribute("version").id(), ce.version()); - mock.expectedHeaderReceived(ce.mandatoryAttribute("type").id(), "org.apache.camel.event"); - mock.expectedHeaderReceived(ce.mandatoryAttribute("id").id(), "myEventID"); - mock.expectedHeaderReceived(ce.mandatoryAttribute("source").id(), "/somewhere"); + mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).id(), ce.version()); + mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).id(), "org.apache.camel.event"); + mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).id(), "myEventID"); + mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).id(), "/somewhere"); mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain"); - mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute("time").id())); + mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).id())); mock.expectedBodiesReceived("test"); mock.expectedMessageCount(1); @@ -266,12 +270,12 @@ public class KnativeHttpTest { context.start(); MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class); - mock.expectedHeaderReceived(ce.mandatoryAttribute("version").id(), ce.version()); - mock.expectedHeaderReceived(ce.mandatoryAttribute("type").id(), "org.apache.camel.event"); - mock.expectedHeaderReceived(ce.mandatoryAttribute("id").id(), "myEventID"); - mock.expectedHeaderReceived(ce.mandatoryAttribute("source").id(), "/somewhere"); + mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).id(), ce.version()); + mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).id(), "org.apache.camel.event"); + mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).id(), "myEventID"); + mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).id(), "/somewhere"); mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain"); - mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute("time").id())); + mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).id())); mock.expectedBodiesReceived("test"); mock.expectedMessageCount(1); @@ -279,11 +283,11 @@ public class KnativeHttpTest { "direct:source", e -> { e.getMessage().setHeader(Exchange.CONTENT_TYPE, "text/plain"); - e.getMessage().setHeader(ce.mandatoryAttribute("version").id(), ce.version()); - e.getMessage().setHeader(ce.mandatoryAttribute("type").id(), "org.apache.camel.event"); - e.getMessage().setHeader(ce.mandatoryAttribute("id").id(), "myEventID"); - e.getMessage().setHeader(ce.mandatoryAttribute("time").id(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())); - e.getMessage().setHeader(ce.mandatoryAttribute("source").id(), "/somewhere"); + e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version()); + e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "org.apache.camel.event"); + e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID"); + e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())); + e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "/somewhere"); e.getMessage().setBody("test"); } ); @@ -305,7 +309,7 @@ public class KnativeHttpTest { KnativeSupport.mapOf( Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain", - Knative.KNATIVE_FILTER_PREFIX + ce.mandatoryAttribute("source").id(), "CE1" + Knative.KNATIVE_FILTER_PREFIX + ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "CE1" )), endpoint( Knative.EndpointKind.source, @@ -315,7 +319,7 @@ public class KnativeHttpTest { KnativeSupport.mapOf( Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain", - Knative.KNATIVE_FILTER_PREFIX + ce.mandatoryAttribute("source").id(), "CE2" + Knative.KNATIVE_FILTER_PREFIX + ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "CE2" )) ); @@ -343,41 +347,41 @@ public class KnativeHttpTest { context.start(); MockEndpoint mock1 = context.getEndpoint("mock:ce1", MockEndpoint.class); - mock1.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute("time").id())); - mock1.expectedHeaderReceived(ce.mandatoryAttribute("version").id(), ce.version()); - mock1.expectedHeaderReceived(ce.mandatoryAttribute("type").id(), "org.apache.camel.event"); - mock1.expectedHeaderReceived(ce.mandatoryAttribute("id").id(), "myEventID1"); - mock1.expectedHeaderReceived(ce.mandatoryAttribute("source").id(), "CE1"); + mock1.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).id())); + mock1.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).id(), ce.version()); + mock1.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).id(), "org.apache.camel.event"); + mock1.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).id(), "myEventID1"); + mock1.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).id(), "CE1"); mock1.expectedBodiesReceived("test"); mock1.expectedMessageCount(1); MockEndpoint mock2 = context.getEndpoint("mock:ce2", MockEndpoint.class); - mock2.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute("time").id())); - mock2.expectedHeaderReceived(ce.mandatoryAttribute("version").id(), ce.version()); - mock2.expectedHeaderReceived(ce.mandatoryAttribute("type").id(), "org.apache.camel.event"); - mock2.expectedHeaderReceived(ce.mandatoryAttribute("id").id(), "myEventID2"); - mock2.expectedHeaderReceived(ce.mandatoryAttribute("source").id(), "CE2"); + mock2.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).id())); + mock2.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).id(), ce.version()); + mock2.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).id(), "org.apache.camel.event"); + mock2.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).id(), "myEventID2"); + mock2.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).id(), "CE2"); mock2.expectedBodiesReceived("test"); mock2.expectedMessageCount(1); context.createProducerTemplate().send( "direct:source", e -> { - e.getMessage().setHeader(ce.mandatoryAttribute("version").id(), ce.version()); - e.getMessage().setHeader(ce.mandatoryAttribute("type").id(), "org.apache.camel.event"); - e.getMessage().setHeader(ce.mandatoryAttribute("id").id(), "myEventID1"); - e.getMessage().setHeader(ce.mandatoryAttribute("time").id(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())); - e.getMessage().setHeader(ce.mandatoryAttribute("source").id(), "CE1"); + e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version()); + e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "org.apache.camel.event"); + e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID1"); + e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())); + e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "CE1"); } ); context.createProducerTemplate().send( "direct:source", e -> { - e.getMessage().setHeader(ce.mandatoryAttribute("version").id(), ce.version()); - e.getMessage().setHeader(ce.mandatoryAttribute("type").id(), "org.apache.camel.event"); - e.getMessage().setHeader(ce.mandatoryAttribute("id").id(), "myEventID2"); - e.getMessage().setHeader(ce.mandatoryAttribute("time").id(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())); - e.getMessage().setHeader(ce.mandatoryAttribute("source").id(), "CE2"); + e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version()); + e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "org.apache.camel.event"); + e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID2"); + e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())); + e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "CE2"); } ); @@ -399,7 +403,7 @@ public class KnativeHttpTest { KnativeSupport.mapOf( Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain", - Knative.KNATIVE_FILTER_PREFIX + ce.mandatoryAttribute("source").id(), "CE[01234]" + Knative.KNATIVE_FILTER_PREFIX + ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "CE[01234]" )), endpoint( Knative.EndpointKind.source, @@ -409,7 +413,7 @@ public class KnativeHttpTest { KnativeSupport.mapOf( Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain", - Knative.KNATIVE_FILTER_PREFIX + ce.mandatoryAttribute("source").id(), "CE[56789]" + Knative.KNATIVE_FILTER_PREFIX + ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "CE[56789]" )) ); @@ -437,41 +441,41 @@ public class KnativeHttpTest { context.start(); MockEndpoint mock1 = context.getEndpoint("mock:ce1", MockEndpoint.class); - mock1.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute("time").id())); - mock1.expectedHeaderReceived(ce.mandatoryAttribute("version").id(), ce.version()); - mock1.expectedHeaderReceived(ce.mandatoryAttribute("type").id(), "org.apache.camel.event"); - mock1.expectedHeaderReceived(ce.mandatoryAttribute("id").id(), "myEventID1"); - mock1.expectedHeaderReceived(ce.mandatoryAttribute("source").id(), "CE0"); + mock1.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).id())); + mock1.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).id(), ce.version()); + mock1.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).id(), "org.apache.camel.event"); + mock1.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).id(), "myEventID1"); + mock1.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).id(), "CE0"); mock1.expectedBodiesReceived("test"); mock1.expectedMessageCount(1); MockEndpoint mock2 = context.getEndpoint("mock:ce2", MockEndpoint.class); - mock2.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute("time").id())); - mock2.expectedHeaderReceived(ce.mandatoryAttribute("version").id(), ce.version()); - mock2.expectedHeaderReceived(ce.mandatoryAttribute("type").id(), "org.apache.camel.event"); - mock2.expectedHeaderReceived(ce.mandatoryAttribute("id").id(), "myEventID2"); - mock2.expectedHeaderReceived(ce.mandatoryAttribute("source").id(), "CE5"); + mock2.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).id())); + mock2.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).id(), ce.version()); + mock2.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).id(), "org.apache.camel.event"); + mock2.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).id(), "myEventID2"); + mock2.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).id(), "CE5"); mock2.expectedBodiesReceived("test"); mock2.expectedMessageCount(1); context.createProducerTemplate().send( "direct:source", e -> { - e.getMessage().setHeader(ce.mandatoryAttribute("version").id(), ce.version()); - e.getMessage().setHeader(ce.mandatoryAttribute("type").id(), "org.apache.camel.event"); - e.getMessage().setHeader(ce.mandatoryAttribute("id").id(), "myEventID1"); - e.getMessage().setHeader(ce.mandatoryAttribute("time").id(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())); - e.getMessage().setHeader(ce.mandatoryAttribute("source").id(), "CE0"); + e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version()); + e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "org.apache.camel.event"); + e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID1"); + e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())); + e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "CE0"); } ); context.createProducerTemplate().send( "direct:source", e -> { - e.getMessage().setHeader(ce.mandatoryAttribute("version").id(), ce.version()); - e.getMessage().setHeader(ce.mandatoryAttribute("type").id(), "org.apache.camel.event"); - e.getMessage().setHeader(ce.mandatoryAttribute("id").id(), "myEventID2"); - e.getMessage().setHeader(ce.mandatoryAttribute("time").id(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())); - e.getMessage().setHeader(ce.mandatoryAttribute("source").id(), "CE5"); + e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version()); + e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "org.apache.camel.event"); + e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID2"); + e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())); + e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "CE5"); } ); @@ -516,41 +520,41 @@ public class KnativeHttpTest { context.start(); MockEndpoint mock1 = context.getEndpoint("mock:ce1", MockEndpoint.class); - mock1.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute("time").id())); - mock1.expectedHeaderReceived(ce.mandatoryAttribute("version").id(), ce.version()); - mock1.expectedHeaderReceived(ce.mandatoryAttribute("type").id(), "event1"); - mock1.expectedHeaderReceived(ce.mandatoryAttribute("id").id(), "myEventID1"); - mock1.expectedHeaderReceived(ce.mandatoryAttribute("source").id(), "CE1"); + mock1.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).id())); + mock1.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).id(), ce.version()); + mock1.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).id(), "event1"); + mock1.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).id(), "myEventID1"); + mock1.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).id(), "CE1"); mock1.expectedBodiesReceived("test"); mock1.expectedMessageCount(1); MockEndpoint mock2 = context.getEndpoint("mock:ce2", MockEndpoint.class); - mock2.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute("time").id())); - mock2.expectedHeaderReceived(ce.mandatoryAttribute("version").id(), ce.version()); - mock2.expectedHeaderReceived(ce.mandatoryAttribute("type").id(), "event2"); - mock2.expectedHeaderReceived(ce.mandatoryAttribute("id").id(), "myEventID2"); - mock2.expectedHeaderReceived(ce.mandatoryAttribute("source").id(), "CE2"); + mock2.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).id())); + mock2.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).id(), ce.version()); + mock2.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).id(), "event2"); + mock2.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).id(), "myEventID2"); + mock2.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).id(), "CE2"); mock2.expectedBodiesReceived("test"); mock2.expectedMessageCount(1); context.createProducerTemplate().send( "direct:source", e -> { - e.getMessage().setHeader(ce.mandatoryAttribute("version").id(), ce.version()); - e.getMessage().setHeader(ce.mandatoryAttribute("type").id(), "event1"); - e.getMessage().setHeader(ce.mandatoryAttribute("id").id(), "myEventID1"); - e.getMessage().setHeader(ce.mandatoryAttribute("time").id(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())); - e.getMessage().setHeader(ce.mandatoryAttribute("source").id(), "CE1"); + e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version()); + e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "event1"); + e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID1"); + e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())); + e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "CE1"); } ); context.createProducerTemplate().send( "direct:source", e -> { - e.getMessage().setHeader(ce.mandatoryAttribute("version").id(), ce.version()); - e.getMessage().setHeader(ce.mandatoryAttribute("type").id(), "event2"); - e.getMessage().setHeader(ce.mandatoryAttribute("id").id(), "myEventID2"); - e.getMessage().setHeader(ce.mandatoryAttribute("time").id(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())); - e.getMessage().setHeader(ce.mandatoryAttribute("source").id(), "CE2"); + e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version()); + e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "event2"); + e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID2"); + e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())); + e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "CE2"); } ); @@ -865,11 +869,11 @@ public class KnativeHttpTest { context.start(); MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class); - mock.expectedHeaderReceived(ce.mandatoryAttribute("version").id(), ce.version()); - mock.expectedHeaderReceived(ce.mandatoryAttribute("type").id(), "myEvent"); + mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).id(), ce.version()); + mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).id(), "myEvent"); mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain"); - mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute("time").id())); - mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute("id").id())); + mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).id())); + mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).id())); mock.expectedBodiesReceived("test"); mock.expectedMessageCount(1); @@ -922,11 +926,11 @@ public class KnativeHttpTest { context.start(); MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class); - mock.expectedHeaderReceived(ce.mandatoryAttribute("version").id(), ce.version()); - mock.expectedHeaderReceived(ce.mandatoryAttribute("type").id(), "myEvent"); + mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).id(), ce.version()); + mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).id(), "myEvent"); mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain"); - mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute("time").id())); - mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute("id").id())); + mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).id())); + mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).id())); mock.expectedBodiesReceived("test"); mock.expectedMessageCount(1); @@ -978,12 +982,12 @@ public class KnativeHttpTest { context.start(); MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class); - mock.expectedHeaderReceived(ce.mandatoryAttribute("version").id(), ce.version()); - mock.expectedHeaderReceived(ce.mandatoryAttribute("type").id(), "org.apache.camel.event"); - mock.expectedHeaderReceived(ce.mandatoryAttribute("id").id(), "myEventID"); - mock.expectedHeaderReceived(ce.mandatoryAttribute("source").id(), "/somewhere"); + mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).id(), ce.version()); + mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).id(), "org.apache.camel.event"); + mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).id(), "myEventID"); + mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).id(), "/somewhere"); mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain"); - mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute("time").id())); + mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).id())); mock.expectedBodiesReceived("test"); mock.expectedMessageCount(1); @@ -991,11 +995,11 @@ public class KnativeHttpTest { "direct:source", e -> { e.getMessage().setHeader(Exchange.CONTENT_TYPE, "text/plain"); - e.getMessage().setHeader(ce.mandatoryAttribute("version").id(), ce.version()); - e.getMessage().setHeader(ce.mandatoryAttribute("type").id(), "org.apache.camel.event"); - e.getMessage().setHeader(ce.mandatoryAttribute("id").id(), "myEventID"); - e.getMessage().setHeader(ce.mandatoryAttribute("time").id(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())); - e.getMessage().setHeader(ce.mandatoryAttribute("source").id(), "/somewhere"); + e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version()); + e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "org.apache.camel.event"); + e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID"); + e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())); + e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "/somewhere"); e.getMessage().setBody("test"); } ); @@ -1191,5 +1195,58 @@ public class KnativeHttpTest { assertThat(result.getMessage().getBody()).isEqualTo(definition.getName()); } } + + @ParameterizedTest + @MethodSource("provideCloudEventsImplementations") + void testHeaders(CloudEvent ce) throws Exception { + configureKnativeComponent( + context, + ce, + endpoint( + Knative.EndpointKind.sink, + "ep", + "localhost", + port, + KnativeSupport.mapOf( + Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", + Knative.CONTENT_TYPE, "text/plain" + ) + ) + ); + + CountDownLatch latch = new CountDownLatch(1); + AtomicReference<HttpServerExchange> exchange = new AtomicReference<>(); + + Undertow server = Undertow.builder() + .addHttpListener(port, "localhost") + .setHandler(se -> { + exchange.set(se); + latch.countDown(); + }) + .build(); + + RouteBuilder.addRoutes(context, b -> { + b.from("direct:start") + .to("knative:endpoint/ep"); + }); + + context.start(); + try { + server.start(); + template.sendBody("direct:start", ""); + + latch.await(); + + HeaderMap headers = exchange.get().getRequestHeaders(); + + assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http())).isEqualTo(ce.version()); + assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http())).isEqualTo("org.apache.camel.event"); + assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http())).isNotNull(); + assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http())).isEqualTo("knative://endpoint/ep"); + assertThat(headers.getFirst(Exchange.CONTENT_TYPE)).isEqualTo("text/plain"); + } finally { + server.stop(); + } + } } diff --git a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java index 087286d..d06adf9 100644 --- a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java +++ b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java @@ -26,6 +26,7 @@ import org.apache.camel.Processor; import org.apache.camel.Producer; import org.apache.camel.component.knative.ce.CloudEventProcessor; import org.apache.camel.component.knative.ce.CloudEventProcessors; +import org.apache.camel.component.knative.spi.CloudEvent; import org.apache.camel.component.knative.spi.Knative; import org.apache.camel.component.knative.spi.KnativeEnvironment; import org.apache.camel.processor.Pipeline; @@ -159,7 +160,7 @@ public class KnativeEndpoint extends DefaultEndpoint { if (service.get().getType() == Knative.Type.event) { metadata.put(Knative.KNATIVE_EVENT_TYPE, serviceName); - metadata.put(Knative.KNATIVE_FILTER_PREFIX + cloudEvent.cloudEvent().mandatoryAttribute("type").id(), serviceName); + metadata.put(Knative.KNATIVE_FILTER_PREFIX + cloudEvent.cloudEvent().mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), serviceName); } return new KnativeEnvironment.KnativeServiceDefinition( diff --git a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/AbstractCloudEventProcessor.java b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/AbstractCloudEventProcessor.java index 5e0ca0d..6d1af26 100644 --- a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/AbstractCloudEventProcessor.java +++ b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/AbstractCloudEventProcessor.java @@ -51,21 +51,19 @@ abstract class AbstractCloudEventProcessor implements CloudEventProcessor { } if (!Objects.equals(exchange.getIn().getHeader(Exchange.CONTENT_TYPE), Knative.MIME_STRUCTURED_CONTENT_MODE)) { - // - // The event is not in the form of Structured Content Mode - // then leave it as it is. - // - // Note that this is true for http binding only. - // - // More info: - // - // https://github.com/cloudevents/spec/blob/master/http-transport-binding.md#32-structured-content-mode - // - return; - } + final CloudEvent ce = cloudEvent(); + final Map<String, Object> headers = exchange.getIn().getHeaders(); - try (InputStream is = exchange.getIn().getBody(InputStream.class)) { - decodeStructuredContent(exchange, Knative.MAPPER.readValue(is, Map.class)); + for (CloudEvent.Attribute attribute: ce.attributes()) { + Object val = headers.remove(attribute.http()); + if (val != null) { + headers.put(attribute.id(), val); + } + } + } else { + try (InputStream is = exchange.getIn().getBody(InputStream.class)) { + decodeStructuredContent(exchange, Knative.MAPPER.readValue(is, Map.class)); + } } }; } @@ -87,11 +85,11 @@ abstract class AbstractCloudEventProcessor implements CloudEventProcessor { final String eventTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(created); final Map<String, Object> headers = exchange.getIn().getHeaders(); - headers.putIfAbsent(ce.mandatoryAttribute("id").id(), exchange.getExchangeId()); - headers.putIfAbsent(ce.mandatoryAttribute("source").id(), endpoint.getEndpointUri()); - headers.putIfAbsent(ce.mandatoryAttribute("version").id(), ce.version()); - headers.putIfAbsent(ce.mandatoryAttribute("type").id(), eventType); - headers.putIfAbsent(ce.mandatoryAttribute("time").id(), eventTime); + headers.putIfAbsent(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), exchange.getExchangeId()); + headers.putIfAbsent(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), endpoint.getEndpointUri()); + headers.putIfAbsent(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version()); + headers.putIfAbsent(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), eventType); + headers.putIfAbsent(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), eventTime); headers.putIfAbsent(Exchange.CONTENT_TYPE, contentType); }; } diff --git a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/CloudEventProcessors.java b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/CloudEventProcessors.java index a91a64d..2d5f499 100644 --- a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/CloudEventProcessors.java +++ b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/CloudEventProcessors.java @@ -41,14 +41,14 @@ public enum CloudEventProcessors implements CloudEventProcessor { // body ifNotEmpty(content.remove("data"), message::setBody); - ifNotEmpty(content.remove(ce.mandatoryAttribute("content.type").json()), val -> { + ifNotEmpty(content.remove(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_CONTENT_TYPE).json()), val -> { message.setHeader(Exchange.CONTENT_TYPE, val); }); // // Map extensions to standard camel headers // - ifNotEmpty(content.remove("extensions"), val -> { + ifNotEmpty(content.remove(CloudEvent.CAMEL_CLOUD_EVENT_EXTENSIONS), val -> { if (val instanceof Map) { ((Map<String, Object>) val).forEach(message::setHeader); } @@ -70,7 +70,7 @@ public enum CloudEventProcessors implements CloudEventProcessor { // body ifNotEmpty(content.remove("data"), message::setBody); - ifNotEmpty(content.remove(ce.mandatoryAttribute("content.type").json()), val -> { + ifNotEmpty(content.remove(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_CONTENT_TYPE).json()), val -> { message.setHeader(Exchange.CONTENT_TYPE, val); }); @@ -98,11 +98,11 @@ public enum CloudEventProcessors implements CloudEventProcessor { // body ifNotEmpty(content.remove("data"), message::setBody); - ifNotEmpty(content.remove(ce.mandatoryAttribute("data.content.type").json()), val -> { + ifNotEmpty(content.remove(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_DATA_CONTENT_TYPE).json()), val -> { message.setHeader(Exchange.CONTENT_TYPE, val); }); - ifNotEmpty(content.remove(ce.mandatoryAttribute("data.content.encoding").json()), val -> { - message.setBody(val); + ifNotEmpty(content.remove(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_DATA_CONTENT_ENCODING).json()), val -> { + message.setHeader(Exchange.CONTENT_ENCODING, val); }); for (CloudEvent.Attribute attribute: ce.attributes()) {