This is an automated email from the ASF dual-hosted git repository. lburgazzoli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-k-runtime.git
The following commit(s) were added to refs/heads/master by this push: new b72dc22 camel-knative: don't replace message when the producers completes #399 b72dc22 is described below commit b72dc22bd925e07c598ea512078b3f492b45534f Author: lburgazzoli <lburgazz...@gmail.com> AuthorDate: Thu Jul 16 11:17:39 2020 +0200 camel-knative: don't replace message when the producers completes #399 --- .../knative/http/KnativeHttpConsumer.java | 5 + .../knative/http/KnativeHttpProducer.java | 9 +- .../component/knative/http/KnativeHttpSupport.java | 51 +-- .../knative/http/KnativeHttpTransport.java | 8 +- .../component/knative/http/KnativeHttpServer.java | 33 +- .../component/knative/http/KnativeHttpTest.java | 507 +++++++++++---------- .../knative/http/KnativeHttpTestSupport.java | 4 + .../http/assertions/HttpServerRequestAssert.java | 66 +++ .../component/knative/KnativeConfiguration.java | 6 +- 9 files changed, 405 insertions(+), 284 deletions(-) diff --git a/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumer.java b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumer.java index fc83036..fb60431 100644 --- a/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumer.java +++ b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumer.java @@ -287,6 +287,11 @@ public class KnativeHttpConsumer extends DefaultConsumer { } } } + + KnativeHttpSupport.remapCloudEventHeaders(configuration.getCloudEvent(), message); + if (configuration.isRemoveCloudEventHeadersInReply()) { + KnativeHttpSupport.removeCloudEventHeaders(configuration.getCloudEvent(), message); + } } return response; diff --git a/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpProducer.java b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpProducer.java index 67045d1..ca23300 100644 --- a/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpProducer.java +++ b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpProducer.java @@ -35,7 +35,6 @@ import org.apache.camel.Message; import org.apache.camel.component.knative.spi.KnativeEnvironment; import org.apache.camel.spi.HeaderFilterStrategy; import org.apache.camel.support.DefaultAsyncProducer; -import org.apache.camel.support.DefaultMessage; import org.apache.camel.support.MessageHelper; import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.URISupport; @@ -117,18 +116,20 @@ public class KnativeHttpProducer extends DefaultAsyncProducer { .sendBuffer(Buffer.buffer(payload), response -> { if (response.succeeded()) { HttpResponse<Buffer> result = response.result(); + Message answer = exchange.getMessage(); - Message answer = new DefaultMessage(exchange.getContext()); answer.setHeader(Exchange.HTTP_RESPONSE_CODE, result.statusCode()); for (Map.Entry<String, String> entry : result.headers().entries()) { if (!headerFilterStrategy.applyFilterToExternalHeaders(entry.getKey(), entry.getValue(), exchange)) { - KnativeHttpSupport.appendHeader(answer.getHeaders(), entry.getKey(), entry.getValue()); + answer.setHeader(entry.getKey(), entry.getValue()); } } if (result.body() != null) { answer.setBody(result.body().getBytes()); + } else { + answer.setBody(null); } if (result.statusCode() < 200 || result.statusCode() >= 300) { @@ -143,8 +144,6 @@ public class KnativeHttpProducer extends DefaultAsyncProducer { } answer.setHeader(Exchange.HTTP_RESPONSE_CODE, result.statusCode()); - - exchange.setMessage(answer); } else if (response.failed()) { String exceptionMessage = "HTTP operation failed invoking " + URISupport.sanitizeUri(this.uri.get()); if (response.result() != null) { diff --git a/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpSupport.java b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpSupport.java index c74e0ab..67bb64f 100644 --- a/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpSupport.java +++ b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpSupport.java @@ -24,14 +24,10 @@ import java.util.function.Predicate; import java.util.stream.Collectors; import io.vertx.core.http.HttpServerRequest; -import org.apache.camel.AsyncCallback; -import org.apache.camel.Exchange; import org.apache.camel.Message; -import org.apache.camel.Processor; import org.apache.camel.component.knative.spi.CloudEvent; import org.apache.camel.component.knative.spi.Knative; import org.apache.camel.component.knative.spi.KnativeEnvironment; -import org.apache.camel.support.processor.DelegateAsyncProcessor; public final class KnativeHttpSupport { private KnativeHttpSupport() { @@ -97,46 +93,25 @@ public final class KnativeHttpSupport { /** * Removes cloud event headers at the end of the processing. */ - public static Processor withoutCloudEventHeaders(Processor delegate, CloudEvent ce) { - return new DelegateAsyncProcessor(delegate) { - @Override - public boolean process(Exchange exchange, AsyncCallback callback) { - return processor.process(exchange, doneSync -> { - final Message message = exchange.getMessage(); - - // remove CloudEvent headers - for (CloudEvent.Attribute attr : ce.attributes()) { - message.removeHeader(attr.http()); - } - - callback.done(doneSync); - }); - } - }; + public static void removeCloudEventHeaders(CloudEvent ce, Message message) { + // remove CloudEvent headers + for (CloudEvent.Attribute attr : ce.attributes()) { + message.removeHeader(attr.http()); + message.removeHeader(attr.id()); + } } /** * Remap camel headers to cloud event http headers. */ - public static Processor remapCloudEventHeaders(Processor delegate, CloudEvent ce) { - return new DelegateAsyncProcessor(delegate) { - @Override - public boolean process(Exchange exchange, AsyncCallback callback) { - return processor.process(exchange, doneSync -> { - final Message message = exchange.getMessage(); - - // remap CloudEvent camel --> http - for (CloudEvent.Attribute attr : ce.attributes()) { - Object value = message.getHeader(attr.id()); - if (value != null) { - message.setHeader(attr.http(), value); - } - } - - callback.done(doneSync); - }); + public static void remapCloudEventHeaders(CloudEvent ce, Message message) { + // remap CloudEvent camel --> http + for (CloudEvent.Attribute attr : ce.attributes()) { + Object value = message.getHeader(attr.id()); + if (value != null) { + message.setHeader(attr.http(), value); } - }; + } } } diff --git a/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpTransport.java b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpTransport.java index af3f7d4..a9b0613 100644 --- a/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpTransport.java +++ b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpTransport.java @@ -96,13 +96,7 @@ public class KnativeHttpTransport extends ServiceSupport implements CamelContext @Override public Consumer createConsumer(Endpoint endpoint, KnativeTransportConfiguration config, KnativeEnvironment.KnativeServiceDefinition service, Processor processor) { - Processor next = KnativeHttpSupport.remapCloudEventHeaders(processor, config.getCloudEvent()); - - if (config.isRemoveCloudEventHeadersInReply()) { - next = KnativeHttpSupport.withoutCloudEventHeaders(next, config.getCloudEvent()); - } - - return new KnativeHttpConsumer(config, endpoint, service, this.router, next); + return new KnativeHttpConsumer(config, endpoint, service, this.router, processor); } } diff --git a/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpServer.java b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpServer.java index cf3a507..eecbfe6 100644 --- a/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpServer.java +++ b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpServer.java @@ -33,6 +33,7 @@ import io.vertx.ext.web.RoutingContext; import io.vertx.ext.web.handler.BodyHandler; import org.apache.camel.CamelContext; import org.apache.camel.component.platform.http.PlatformHttpConstants; +import org.apache.camel.k.test.AvailablePortFinder; import org.apache.camel.support.service.ServiceSupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,13 +45,17 @@ public class KnativeHttpServer extends ServiceSupport { private final String host; private final int port; private final String path; + private final BlockingQueue<HttpServerRequest> requests; + private final Handler<RoutingContext> handler; private Vertx vertx; private Router router; private ExecutorService executor; private HttpServer server; - private BlockingQueue<HttpServerRequest> requests; - private Handler<RoutingContext> handler; + + public KnativeHttpServer(CamelContext context) { + this(context, "localhost", AvailablePortFinder.getNextAvailable(), "/", null); + } public KnativeHttpServer(CamelContext context, int port) { this(context, "localhost", port, "/", null); @@ -60,10 +65,22 @@ public class KnativeHttpServer extends ServiceSupport { this(context, "localhost", port, "/", handler); } + public KnativeHttpServer(CamelContext context, Handler<RoutingContext> handler) { + this(context, "localhost", AvailablePortFinder.getNextAvailable(), "/", handler); + } + public KnativeHttpServer(CamelContext context, String host, int port, String path) { this(context, host, port, path, null); } + public KnativeHttpServer(CamelContext context, String host, String path) { + this(context, host, AvailablePortFinder.getNextAvailable(), path, null); + } + + public KnativeHttpServer(CamelContext context, String host, String path, Handler<RoutingContext> handler) { + this(context, host, AvailablePortFinder.getNextAvailable(), path, handler); + } + public KnativeHttpServer(CamelContext context, String host, int port, String path, Handler<RoutingContext> handler) { this.context = context; this.host = host; @@ -78,6 +95,18 @@ public class KnativeHttpServer extends ServiceSupport { }; } + public String getHost() { + return host; + } + + public int getPort() { + return port; + } + + public String getPath() { + return path; + } + public HttpServerRequest poll(int timeout, TimeUnit unit) throws InterruptedException { return requests.poll(timeout, unit); } diff --git a/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java index caaf91b..63e18cb 100644 --- a/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java +++ b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java @@ -57,13 +57,13 @@ import static io.restassured.RestAssured.config; import static io.restassured.RestAssured.given; import static io.restassured.config.EncoderConfig.encoderConfig; import static org.apache.camel.component.knative.http.KnativeHttpTestSupport.configureKnativeComponent; +import static org.apache.camel.component.knative.http.KnativeHttpTestSupport.httpAttribute; import static org.apache.camel.component.knative.spi.KnativeEnvironment.channel; import static org.apache.camel.component.knative.spi.KnativeEnvironment.endpoint; import static org.apache.camel.component.knative.spi.KnativeEnvironment.event; import static org.apache.camel.component.knative.spi.KnativeEnvironment.sourceChannel; import static org.apache.camel.component.knative.spi.KnativeEnvironment.sourceEndpoint; import static org.apache.camel.component.knative.spi.KnativeEnvironment.sourceEvent; -import static org.apache.camel.util.CollectionHelper.mapOf; import static org.assertj.core.api.Assertions.assertThat; import static org.hamcrest.Matchers.emptyOrNullString; import static org.hamcrest.Matchers.is; @@ -73,6 +73,7 @@ public class KnativeHttpTest { private CamelContext context; private ProducerTemplate template; private int platformHttpPort; + private String platformHttpHost; // ************************** // @@ -84,6 +85,7 @@ public class KnativeHttpTest { public void before() { this.context = new DefaultCamelContext(); this.template = this.context.createProducerTemplate(); + this.platformHttpHost = "localhost"; this.platformHttpPort = AvailablePortFinder.getNextAvailable(); PlatformHttpServiceContextCustomizer httpService = new PlatformHttpServiceContextCustomizer(); @@ -124,8 +126,8 @@ public class KnativeHttpTest { ce, sourceEndpoint( "myEndpoint", - mapOf( - Knative.SERVICE_META_PATH, path, + Map.of( + Knative.SERVICE_META_PATH, ObjectHelper.supplyIfEmpty(path, () -> "/"), Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain" )) @@ -160,11 +162,11 @@ public class KnativeHttpTest { given() .body("test") .header(Exchange.CONTENT_TYPE, "text/plain") - .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version()) - .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "org.apache.camel.event") - .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID") - .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())) - .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "/somewhere") + .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION), ce.version()) + .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE), "org.apache.camel.event") + .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID), "myEventID") + .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TIME), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())) + .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "/somewhere") .when() .post(targetPath) .then() @@ -206,9 +208,9 @@ public class KnativeHttpTest { endpoint( Knative.EndpointKind.sink, "myEndpoint", - "localhost", + platformHttpHost, platformHttpPort, - mapOf( + Map.of( Knative.SERVICE_META_PATH, "/a/path", Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain" @@ -225,12 +227,12 @@ public class KnativeHttpTest { context.start(); MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class); - mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version()); - mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "org.apache.camel.event"); - mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "knative://endpoint/myEndpoint"); + mock.expectedHeaderReceived(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION), ce.version()); + mock.expectedHeaderReceived(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE), "org.apache.camel.event"); + mock.expectedHeaderReceived(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "knative://endpoint/myEndpoint"); mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain"); - mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http())); - mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http())); + mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TIME))); + mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID))); mock.expectedBodiesReceived("test"); mock.expectedMessageCount(1); @@ -250,7 +252,7 @@ public class KnativeHttpTest { "myEndpoint", "none", -1, - mapOf( + Map.of( Knative.SERVICE_META_PATH, "/does/not/exist", Knative.SERVICE_META_URL, String.format("http://localhost:%d/a/path", platformHttpPort), Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", @@ -268,12 +270,12 @@ public class KnativeHttpTest { context.start(); MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class); - mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version()); - mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "org.apache.camel.event"); - mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "knative://endpoint/myEndpoint"); + mock.expectedHeaderReceived(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION), ce.version()); + mock.expectedHeaderReceived(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE), "org.apache.camel.event"); + mock.expectedHeaderReceived(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "knative://endpoint/myEndpoint"); mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain"); - mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http())); - mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http())); + mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TIME))); + mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID))); mock.expectedBodiesReceived("test"); mock.expectedMessageCount(1); @@ -291,7 +293,7 @@ public class KnativeHttpTest { ce, sourceEndpoint( "myEndpoint", - mapOf( + Map.of( Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain" )) @@ -318,7 +320,7 @@ public class KnativeHttpTest { given() .contentType(Knative.MIME_STRUCTURED_CONTENT_MODE) .body( - mapOf( + Map.of( "cloudEventsVersion", ce.version(), "eventType", "org.apache.camel.event", "eventID", "myEventID", @@ -337,7 +339,7 @@ public class KnativeHttpTest { given() .contentType(Knative.MIME_STRUCTURED_CONTENT_MODE) .body( - mapOf( + Map.of( "specversion", ce.version(), "type", "org.apache.camel.event", "id", "myEventID", @@ -356,7 +358,7 @@ public class KnativeHttpTest { given() .contentType(Knative.MIME_STRUCTURED_CONTENT_MODE) .body( - mapOf( + Map.of( "specversion", ce.version(), "type", "org.apache.camel.event", "id", "myEventID", @@ -375,7 +377,7 @@ public class KnativeHttpTest { given() .contentType(Knative.MIME_STRUCTURED_CONTENT_MODE) .body( - mapOf( + Map.of( "specversion", ce.version(), "type", "org.apache.camel.event", "id", "myEventID", @@ -405,7 +407,7 @@ public class KnativeHttpTest { ce, sourceEndpoint( "myEndpoint", - mapOf( + Map.of( Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain" )) @@ -431,11 +433,11 @@ public class KnativeHttpTest { given() .body("test") .header(Exchange.CONTENT_TYPE, "text/plain") - .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version()) - .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "org.apache.camel.event") - .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID") - .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())) - .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "/somewhere") + .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION), ce.version()) + .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE), "org.apache.camel.event") + .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID), "myEventID") + .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TIME), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())) + .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "/somewhere") .when() .post() .then() @@ -452,17 +454,17 @@ public class KnativeHttpTest { ce, sourceEndpoint( "ep1", - mapOf( + Map.of( Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain", - Knative.KNATIVE_FILTER_PREFIX + ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "CE1" + Knative.KNATIVE_FILTER_PREFIX + httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "CE1" )), sourceEndpoint( "ep2", - mapOf( + Map.of( Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain", - Knative.KNATIVE_FILTER_PREFIX + ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "CE2" + Knative.KNATIVE_FILTER_PREFIX + httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "CE2" )) ); @@ -500,11 +502,11 @@ public class KnativeHttpTest { given() .body("test") .header(Exchange.CONTENT_TYPE, "text/plain") - .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version()) - .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "org.apache.camel.event") - .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID1") - .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())) - .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "CE1") + .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION), ce.version()) + .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE), "org.apache.camel.event") + .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID), "myEventID1") + .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TIME), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())) + .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "CE1") .when() .post() .then() @@ -513,11 +515,11 @@ public class KnativeHttpTest { given() .body("test") .header(Exchange.CONTENT_TYPE, "text/plain") - .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version()) - .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "org.apache.camel.event") - .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID2") - .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())) - .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "CE2") + .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION), ce.version()) + .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE), "org.apache.camel.event") + .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID), "myEventID2") + .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TIME), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())) + .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "CE2") .when() .post() .then() @@ -535,17 +537,17 @@ public class KnativeHttpTest { ce, sourceEndpoint( "ep1", - mapOf( + Map.of( Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain", - Knative.KNATIVE_FILTER_PREFIX + ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "CE[01234]" + Knative.KNATIVE_FILTER_PREFIX + httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "CE[01234]" )), sourceEndpoint( "ep2", - mapOf( + Map.of( Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain", - Knative.KNATIVE_FILTER_PREFIX + ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "CE[56789]" + Knative.KNATIVE_FILTER_PREFIX + httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "CE[56789]" )) ); @@ -583,11 +585,11 @@ public class KnativeHttpTest { given() .body("test") .header(Exchange.CONTENT_TYPE, "text/plain") - .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version()) - .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "org.apache.camel.event") - .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID1") - .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())) - .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "CE0") + .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION), ce.version()) + .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE), "org.apache.camel.event") + .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID), "myEventID1") + .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TIME), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())) + .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "CE0") .when() .post() .then() @@ -596,11 +598,11 @@ public class KnativeHttpTest { given() .body("test") .header(Exchange.CONTENT_TYPE, "text/plain") - .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version()) - .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "org.apache.camel.event") - .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID2") - .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())) - .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "CE5") + .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION), ce.version()) + .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE), "org.apache.camel.event") + .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID), "myEventID2") + .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TIME), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())) + .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "CE5") .when() .post() .then() @@ -653,11 +655,11 @@ public class KnativeHttpTest { given() .body("test") .header(Exchange.CONTENT_TYPE, "text/plain") - .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version()) - .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "event1") - .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID1") - .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())) - .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "CE1") + .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION), ce.version()) + .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE), "event1") + .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID), "myEventID1") + .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TIME), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())) + .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "CE1") .when() .post() .then() @@ -666,11 +668,11 @@ public class KnativeHttpTest { given() .body("test") .header(Exchange.CONTENT_TYPE, "text/plain") - .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version()) - .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "event2") - .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID2") - .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())) - .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "CE2") + .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION), ce.version()) + .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE), "event2") + .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID), "myEventID2") + .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TIME), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())) + .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "CE2") .when() .post() .then() @@ -688,17 +690,17 @@ public class KnativeHttpTest { ce, sourceEndpoint( "from", - mapOf( - Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", + Map.of( + Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event.from", Knative.CONTENT_TYPE, "text/plain" )), endpoint( Knative.EndpointKind.sink, "to", - "localhost", + platformHttpHost, platformHttpPort, - mapOf( - Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", + Map.of( + Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event.to", Knative.CONTENT_TYPE, "text/plain" )) ); @@ -707,9 +709,9 @@ public class KnativeHttpTest { b.from("knative:endpoint/from") .convertBodyTo(String.class) .setBody() - .constant("consumer") + .constant("consumer") .setHeader(CloudEvent.CAMEL_CLOUD_EVENT_TYPE) - .constant("custom"); + .constant("custom"); b.from("direct:source") .to("knative://endpoint/to") .log("${body}") @@ -718,7 +720,7 @@ public class KnativeHttpTest { MockEndpoint mock = context.getEndpoint("mock:to", MockEndpoint.class); mock.expectedBodiesReceived("consumer"); - mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), null); + mock.expectedHeaderReceived(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE), "org.apache.camel.event.to"); mock.expectedMessageCount(1); context.start(); @@ -735,16 +737,16 @@ public class KnativeHttpTest { ce, sourceEndpoint( "from", - mapOf( + Map.of( Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain" )), endpoint( Knative.EndpointKind.sink, "to", - "localhost", + platformHttpHost, platformHttpPort, - mapOf( + Map.of( Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain" )) @@ -754,9 +756,9 @@ public class KnativeHttpTest { b.from("knative:endpoint/from?replyWithCloudEvent=true") .convertBodyTo(String.class) .setBody() - .constant("consumer") + .constant("consumer") .setHeader(CloudEvent.CAMEL_CLOUD_EVENT_TYPE) - .constant("custom"); + .constant("custom"); b.from("direct:source") .to("knative://endpoint/to") .log("${body}") @@ -765,7 +767,7 @@ public class KnativeHttpTest { MockEndpoint mock = context.getEndpoint("mock:to", MockEndpoint.class); mock.expectedBodiesReceived("consumer"); - mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "custom"); + mock.expectedHeaderReceived(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE), "custom"); mock.expectedMessageCount(1); context.start(); @@ -785,7 +787,7 @@ public class KnativeHttpTest { "test", "", platformHttpPort, - mapOf( + Map.of( Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain" ) @@ -815,9 +817,9 @@ public class KnativeHttpTest { endpoint( Knative.EndpointKind.sink, "test", - "localhost", + platformHttpHost, platformHttpPort, - mapOf( + Map.of( Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain" ) @@ -846,7 +848,7 @@ public class KnativeHttpTest { ce, sourceEndpoint( "ep1", - mapOf( + Map.of( Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain", Knative.KNATIVE_FILTER_PREFIX + "h", "h1" @@ -854,7 +856,7 @@ public class KnativeHttpTest { ), sourceEndpoint( "ep2", - mapOf( + Map.of( Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain", Knative.KNATIVE_FILTER_PREFIX + "h", "h2" @@ -897,7 +899,7 @@ public class KnativeHttpTest { ce, sourceEndpoint( "ep1", - mapOf( + Map.of( Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain", Knative.KNATIVE_FILTER_PREFIX + "h", "h1" @@ -905,7 +907,7 @@ public class KnativeHttpTest { ), sourceEndpoint( "ep2", - mapOf( + Map.of( Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain", Knative.KNATIVE_FILTER_PREFIX + "h", "h2" @@ -951,9 +953,9 @@ public class KnativeHttpTest { endpoint( Knative.EndpointKind.sink, "ep", - "localhost", + platformHttpHost, platformHttpPort, - mapOf( + Map.of( Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain" ) @@ -989,15 +991,15 @@ public class KnativeHttpTest { event( Knative.EndpointKind.sink, "default", - "localhost", + platformHttpHost, platformHttpPort, - mapOf( + Map.of( Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain" )), sourceEvent( "default", - mapOf( + Map.of( Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain" )) @@ -1035,9 +1037,9 @@ public class KnativeHttpTest { event( Knative.EndpointKind.sink, "default", - "localhost", + platformHttpHost, platformHttpPort, - mapOf( + Map.of( Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain", Knative.KNATIVE_KIND, "MyObject", @@ -1045,7 +1047,7 @@ public class KnativeHttpTest { )), sourceEvent( "default", - mapOf( + Map.of( Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain", Knative.KNATIVE_KIND, "MyOtherObject", @@ -1084,7 +1086,7 @@ public class KnativeHttpTest { ce, sourceEndpoint( "myEndpoint", - mapOf( + Map.of( Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain", Knative.KNATIVE_KIND, "MyObject", @@ -1092,7 +1094,7 @@ public class KnativeHttpTest { )), sourceEndpoint( "myEndpoint", - mapOf( + Map.of( Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain", Knative.KNATIVE_KIND, "MyObject", @@ -1120,11 +1122,11 @@ public class KnativeHttpTest { given() .body("test") .header(Exchange.CONTENT_TYPE, "text/plain") - .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version()) - .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "org.apache.camel.event") - .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID") - .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())) - .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "/somewhere") + .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION), ce.version()) + .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE), "org.apache.camel.event") + .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID), "myEventID") + .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TIME), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())) + .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "/somewhere") .when() .post() .then() @@ -1141,7 +1143,7 @@ public class KnativeHttpTest { ce, sourceEndpoint( "myEndpoint", - mapOf( + Map.of( Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain" )) @@ -1172,9 +1174,9 @@ public class KnativeHttpTest { endpoint( Knative.EndpointKind.sink, "myEndpoint", - "localhost", + platformHttpHost, platformHttpPort, - mapOf( + Map.of( Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain" )) @@ -1196,8 +1198,7 @@ public class KnativeHttpTest { @ParameterizedTest @EnumSource(CloudEvents.class) void testNoContent(CloudEvent ce) throws Exception { - final int wordsPort = AvailablePortFinder.getNextAvailable(); - final KnativeHttpServer server = new KnativeHttpServer(context, wordsPort, event -> { + final KnativeHttpServer server = new KnativeHttpServer(context, event -> { event.response().setStatusCode(204); event.response().end(""); }); @@ -1210,25 +1211,25 @@ public class KnativeHttpTest { "messages", null, -1, - mapOf( + Map.of( Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain" )), channel( Knative.EndpointKind.sink, "messages", - "localhost", + platformHttpHost, platformHttpPort, - mapOf( + Map.of( Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain" )), channel( Knative.EndpointKind.sink, "words", - "localhost", - wordsPort, - mapOf( + server.getHost(), + server.getPort(), + Map.of( Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain" )) @@ -1278,11 +1279,11 @@ public class KnativeHttpTest { given() .body("test") .header(Exchange.CONTENT_TYPE, "text/plain") - .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version()) - .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "org.apache.camel.event") - .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID") - .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())) - .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "/somewhere") + .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION), ce.version()) + .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE), "org.apache.camel.event") + .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID), "myEventID") + .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TIME), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())) + .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "/somewhere") .when() .post() .then() @@ -1315,11 +1316,11 @@ public class KnativeHttpTest { given() .body("test") .header(Exchange.CONTENT_TYPE, "text/plain") - .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version()) - .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "org.apache.camel.event") - .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID") - .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())) - .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "/somewhere") + .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION), ce.version()) + .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE), "org.apache.camel.event") + .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID), "myEventID") + .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TIME), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())) + .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "/somewhere") .when() .post() .then() @@ -1352,11 +1353,11 @@ public class KnativeHttpTest { given() .body("test") .header(Exchange.CONTENT_TYPE, "text/plain") - .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version()) - .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "org.apache.camel.event") - .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID") - .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())) - .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "/somewhere") + .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION), ce.version()) + .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE), "org.apache.camel.event") + .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID), "myEventID") + .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TIME), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())) + .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "/somewhere") .when() .post() .then() @@ -1373,7 +1374,7 @@ public class KnativeHttpTest { .limit(10) .mapToObj(i -> sourceEndpoint( "ep-" + i, - mapOf(Knative.KNATIVE_FILTER_PREFIX + "MyHeader", "channel-" + i))) + Map.of(Knative.KNATIVE_FILTER_PREFIX + "MyHeader", "channel-" + i))) .collect(Collectors.toList()); configureKnativeComponent(context, ce, hops); @@ -1413,7 +1414,7 @@ public class KnativeHttpTest { @EnumSource(CloudEvents.class) void testHeaders(CloudEvent ce) throws Exception { final int port = AvailablePortFinder.getNextAvailable(); - final KnativeHttpServer server = new KnativeHttpServer(context, port); + final KnativeHttpServer server = new KnativeHttpServer(context); configureKnativeComponent( context, @@ -1421,9 +1422,9 @@ public class KnativeHttpTest { endpoint( Knative.EndpointKind.sink, "ep", - "localhost", - port, - mapOf( + server.getHost(), + server.getPort(), + Map.of( Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain" ) @@ -1432,19 +1433,31 @@ public class KnativeHttpTest { RouteBuilder.addRoutes(context, b -> { b.from("direct:start") - .to("knative:endpoint/ep"); + .setHeader("CamelDummyHeader").constant("test") + .to("knative:endpoint/ep") + .to("direct:mock"); + b.from("direct:mock") + .to("mock:ep"); }); context.start(); + try { + MockEndpoint mock = context.getEndpoint("mock:ep", MockEndpoint.class); + mock.expectedHeaderReceived("CamelDummyHeader", "test"); + mock.expectedMessageCount(1); + server.start(); + template.sendBody("direct:start", ""); + mock.assertIsSatisfied(); + HttpServerRequest request = server.poll(30, TimeUnit.SECONDS); - assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http())).isEqualTo(ce.version()); - assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http())).isEqualTo("org.apache.camel.event"); - assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http())).isNotNull(); - assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http())).isEqualTo("knative://endpoint/ep"); + assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION))).isEqualTo(ce.version()); + assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE))).isEqualTo("org.apache.camel.event"); + assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID))).isNotNull(); + assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE))).isEqualTo("knative://endpoint/ep"); assertThat(request.getHeader(Exchange.CONTENT_TYPE)).isEqualTo("text/plain"); } finally { server.stop(); @@ -1453,12 +1466,54 @@ public class KnativeHttpTest { @ParameterizedTest @EnumSource(CloudEvents.class) - void testHeadersOverrideFromEnv(CloudEvent ce) throws Exception { + void testHeadersInReply(CloudEvent ce) throws Exception { final int port = AvailablePortFinder.getNextAvailable(); - final KnativeHttpServer server = new KnativeHttpServer(context, port); - final String typeHeaderKey = ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(); + final KnativeHttpServer server = new KnativeHttpServer(context); + + configureKnativeComponent( + context, + ce, + endpoint( + Knative.EndpointKind.sink, + "ep", + server.getHost(), + server.getPort(), + Map.of( + Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", + Knative.CONTENT_TYPE, "text/plain" + ) + ) + ); + + RouteBuilder.addRoutes(context, b -> { + b.from("direct:start") + .setHeader("CamelDummyHeader").constant("test") + .to("knative:endpoint/ep"); + }); + + context.start(); + + try { + MockEndpoint mock = context.getEndpoint("mock:ep", MockEndpoint.class); + mock.expectedHeaderReceived("CamelDummyHeader", "test"); + mock.expectedMessageCount(1); + + server.start(); + + Exchange exchange = template.request("direct:start", e -> e.getMessage().setBody(null)); + assertThat(exchange.getMessage().getHeaders()).containsEntry("CamelDummyHeader", "test"); + } finally { + server.stop(); + } + } + + @ParameterizedTest + @EnumSource(CloudEvents.class) + void testHeadersOverrideFromEnv(CloudEvent ce) throws Exception { + final KnativeHttpServer server = new KnativeHttpServer(context); + final String typeHeaderKey = httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE); final String typeHeaderVal = UUID.randomUUID().toString(); - final String sourceHeaderKey = ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(); + final String sourceHeaderKey = httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE); final String sourceHeaderVal = UUID.randomUUID().toString(); configureKnativeComponent( @@ -1467,9 +1522,9 @@ public class KnativeHttpTest { endpoint( Knative.EndpointKind.sink, "ep", - "localhost", - port, - mapOf( + server.getHost(), + server.getPort(), + Map.of( Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain", Knative.KNATIVE_CE_OVERRIDE_PREFIX + typeHeaderKey, typeHeaderVal, @@ -1489,10 +1544,10 @@ public class KnativeHttpTest { template.sendBody("direct:start", ""); HttpServerRequest request = server.poll(30, TimeUnit.SECONDS); - assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http())).isEqualTo(ce.version()); - assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http())).isEqualTo(typeHeaderVal); - assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http())).isNotNull(); - assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http())).isEqualTo(sourceHeaderVal); + assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION))).isEqualTo(ce.version()); + assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE))).isEqualTo(typeHeaderVal); + assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID))).isNotNull(); + assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE))).isEqualTo(sourceHeaderVal); assertThat(request.getHeader(Exchange.CONTENT_TYPE)).isEqualTo("text/plain"); } finally { server.stop(); @@ -1502,11 +1557,10 @@ public class KnativeHttpTest { @ParameterizedTest @EnumSource(CloudEvents.class) void testHeadersOverrideFromURI(CloudEvent ce) throws Exception { - final int port = AvailablePortFinder.getNextAvailable(); - final KnativeHttpServer server = new KnativeHttpServer(context, port); - final String typeHeaderKey = ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(); + final KnativeHttpServer server = new KnativeHttpServer(context); + final String typeHeaderKey = httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE); final String typeHeaderVal = UUID.randomUUID().toString(); - final String sourceHeaderKey = ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(); + final String sourceHeaderKey = httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE); final String sourceHeaderVal = UUID.randomUUID().toString(); configureKnativeComponent( @@ -1515,9 +1569,9 @@ public class KnativeHttpTest { endpoint( Knative.EndpointKind.sink, "ep", - "localhost", - port, - mapOf( + server.getHost(), + server.getPort(), + Map.of( Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain" ) @@ -1537,10 +1591,10 @@ public class KnativeHttpTest { template.sendBody("direct:start", ""); HttpServerRequest request = server.poll(30, TimeUnit.SECONDS); - assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http())).isEqualTo(ce.version()); - assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http())).isEqualTo(typeHeaderVal); - assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http())).isNotNull(); - assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http())).isEqualTo(sourceHeaderVal); + assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION))).isEqualTo(ce.version()); + assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE))).isEqualTo(typeHeaderVal); + assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID))).isNotNull(); + assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE))).isEqualTo(sourceHeaderVal); assertThat(request.getHeader(Exchange.CONTENT_TYPE)).isEqualTo("text/plain"); } finally { server.stop(); @@ -1550,11 +1604,10 @@ public class KnativeHttpTest { @ParameterizedTest @EnumSource(CloudEvents.class) void testHeadersOverrideFromConf(CloudEvent ce) throws Exception { - final int port = AvailablePortFinder.getNextAvailable(); - final KnativeHttpServer server = new KnativeHttpServer(context, port); - final String typeHeaderKey = ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(); + final KnativeHttpServer server = new KnativeHttpServer(context); + final String typeHeaderKey = httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE); final String typeHeaderVal = UUID.randomUUID().toString(); - final String sourceHeaderKey = ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(); + final String sourceHeaderKey = httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE); final String sourceHeaderVal = UUID.randomUUID().toString(); KnativeComponent component = configureKnativeComponent( @@ -1563,16 +1616,16 @@ public class KnativeHttpTest { endpoint( Knative.EndpointKind.sink, "ep", - "localhost", - port, - mapOf( + server.getHost(), + server.getPort(), + Map.of( Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain" ) ) ); - component.getConfiguration().setCeOverride(mapOf( + component.getConfiguration().setCeOverride(Map.of( Knative.KNATIVE_CE_OVERRIDE_PREFIX + typeHeaderKey, typeHeaderVal, Knative.KNATIVE_CE_OVERRIDE_PREFIX + sourceHeaderKey, sourceHeaderVal )); @@ -1588,10 +1641,10 @@ public class KnativeHttpTest { template.sendBody("direct:start", ""); HttpServerRequest request = server.poll(30, TimeUnit.SECONDS); - assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http())).isEqualTo(ce.version()); - assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http())).isEqualTo(typeHeaderVal); - assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http())).isNotNull(); - assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http())).isEqualTo(sourceHeaderVal); + assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION))).isEqualTo(ce.version()); + assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE))).isEqualTo(typeHeaderVal); + assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID))).isNotNull(); + assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE))).isEqualTo(sourceHeaderVal); assertThat(request.getHeader(Exchange.CONTENT_TYPE)).isEqualTo("text/plain"); } finally { server.stop(); @@ -1601,8 +1654,7 @@ public class KnativeHttpTest { @ParameterizedTest @EnumSource(CloudEvents.class) void testHeadersOverrideFromRouteWithCamelHeader(CloudEvent ce) throws Exception { - final int port = AvailablePortFinder.getNextAvailable(); - final KnativeHttpServer server = new KnativeHttpServer(context, port); + final KnativeHttpServer server = new KnativeHttpServer(context); configureKnativeComponent( context, @@ -1610,9 +1662,9 @@ public class KnativeHttpTest { endpoint( Knative.EndpointKind.sink, "ep", - "localhost", - port, - mapOf( + server.getHost(), + server.getPort(), + Map.of( Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain" ) @@ -1631,10 +1683,10 @@ public class KnativeHttpTest { template.sendBody("direct:start", ""); HttpServerRequest request = server.poll(30, TimeUnit.SECONDS); - assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http())).isEqualTo(ce.version()); - assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http())).isEqualTo("myType"); - assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http())).isNotNull(); - assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http())).isEqualTo("knative://endpoint/ep"); + assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION))).isEqualTo(ce.version()); + assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE))).isEqualTo("myType"); + assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID))).isNotNull(); + assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE))).isEqualTo("knative://endpoint/ep"); assertThat(request.getHeader(Exchange.CONTENT_TYPE)).isEqualTo("text/plain"); } finally { server.stop(); @@ -1644,8 +1696,7 @@ public class KnativeHttpTest { @ParameterizedTest @EnumSource(CloudEvents.class) void testHeadersOverrideFromRouteWithCEHeader(CloudEvent ce) throws Exception { - final int port = AvailablePortFinder.getNextAvailable(); - final KnativeHttpServer server = new KnativeHttpServer(context, port); + final KnativeHttpServer server = new KnativeHttpServer(context); configureKnativeComponent( context, @@ -1653,9 +1704,9 @@ public class KnativeHttpTest { endpoint( Knative.EndpointKind.sink, "ep", - "localhost", - port, - mapOf( + server.getHost(), + server.getPort(), + Map.of( Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain" ) @@ -1664,7 +1715,7 @@ public class KnativeHttpTest { RouteBuilder.addRoutes(context, b -> { b.from("direct:start") - .setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http()).constant("fromCEHeader") + .setHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE)).constant("fromCEHeader") .setHeader(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).constant("fromCamelHeader") .to("knative:endpoint/ep"); }); @@ -1675,10 +1726,10 @@ public class KnativeHttpTest { template.sendBody("direct:start", ""); HttpServerRequest request = server.poll(30, TimeUnit.SECONDS); - assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http())).isEqualTo(ce.version()); - assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http())).isEqualTo("fromCEHeader"); - assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http())).isNotNull(); - assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http())).isEqualTo("knative://endpoint/ep"); + assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION))).isEqualTo(ce.version()); + assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE))).isEqualTo("fromCEHeader"); + assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID))).isNotNull(); + assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE))).isEqualTo("knative://endpoint/ep"); assertThat(request.getHeader(Exchange.CONTENT_TYPE)).isEqualTo("text/plain"); } finally { server.stop(); @@ -1688,8 +1739,7 @@ public class KnativeHttpTest { @ParameterizedTest @EnumSource(CloudEvents.class) void testEventBridge(CloudEvent ce) throws Exception { - final int port = AvailablePortFinder.getNextAvailable(); - final KnativeHttpServer server = new KnativeHttpServer(context, port); + final KnativeHttpServer server = new KnativeHttpServer(context); configureKnativeComponent( context, @@ -1697,14 +1747,14 @@ public class KnativeHttpTest { event( Knative.EndpointKind.sink, "event.sink", - "localhost", - port, - mapOf( + server.getHost(), + server.getPort(), + Map.of( Knative.CONTENT_TYPE, "text/plain" )), sourceEvent( "event.source", - mapOf( + Map.of( Knative.CONTENT_TYPE, "text/plain" )) ); @@ -1722,19 +1772,19 @@ public class KnativeHttpTest { given() .body("test") .header(Exchange.CONTENT_TYPE, "text/plain") - .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version()) - .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "event.source") - .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID") - .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())) - .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "/somewhere") + .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION), ce.version()) + .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE), "event.source") + .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID), "myEventID") + .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TIME), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())) + .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "/somewhere") .when() .post() .then() .statusCode(204); HttpServerRequest request = server.poll(30, TimeUnit.SECONDS); - assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http())).isEqualTo(ce.version()); - assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http())).isEqualTo("event.sink"); + assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION))).isEqualTo(ce.version()); + assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE))).isEqualTo("event.sink"); assertThat(request.getHeader(Exchange.CONTENT_TYPE)).isEqualTo("text/plain"); } finally { server.stop(); @@ -1745,7 +1795,7 @@ public class KnativeHttpTest { @EnumSource(CloudEvents.class) void testDynamicEventBridge(CloudEvent ce) throws Exception { final int port = AvailablePortFinder.getNextAvailable(); - final KnativeHttpServer server = new KnativeHttpServer(context, port); + final KnativeHttpServer server = new KnativeHttpServer(context); configureKnativeComponent( context, @@ -1753,14 +1803,14 @@ public class KnativeHttpTest { event( Knative.EndpointKind.sink, "default", - "localhost", - port, - mapOf( + server.getHost(), + server.getPort(), + Map.of( Knative.CONTENT_TYPE, "text/plain" )), sourceEvent( "event.source", - mapOf( + Map.of( Knative.CONTENT_TYPE, "text/plain" )) ); @@ -1779,19 +1829,19 @@ public class KnativeHttpTest { given() .body("test") .header(Exchange.CONTENT_TYPE, "text/plain") - .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version()) - .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "event.source") - .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID") - .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())) - .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "/somewhere") + .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION), ce.version()) + .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE), "event.source") + .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID), "myEventID") + .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TIME), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())) + .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "/somewhere") .when() .post() .then() .statusCode(204); HttpServerRequest request = server.poll(30, TimeUnit.SECONDS); - assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http())).isEqualTo(ce.version()); - assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http())).isEqualTo("event.sink"); + assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION))).isEqualTo(ce.version()); + assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE))).isEqualTo("event.sink"); assertThat(request.getHeader(Exchange.CONTENT_TYPE)).isEqualTo("text/plain"); } finally { server.stop(); @@ -1801,8 +1851,7 @@ public class KnativeHttpTest { @ParameterizedTest @EnumSource(CloudEvents.class) void testSlowConsumer(CloudEvent ce) throws Exception { - final int port = AvailablePortFinder.getNextAvailable(); - final KnativeHttpServer server = new KnativeHttpServer(context, port, event -> { + final KnativeHttpServer server = new KnativeHttpServer(context, event -> { event.vertx().executeBlocking( promise -> { try { @@ -1825,7 +1874,7 @@ public class KnativeHttpTest { ce, sourceEndpoint( "start", - mapOf( + Map.of( Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain" )) @@ -1837,7 +1886,7 @@ public class KnativeHttpTest { RouteBuilder.addRoutes(context, b -> { b.from("knative:endpoint/start") .removeHeaders("Camel*") - .toF("http://localhost:%d", port); + .toF("http://%s:%d", server.getHost(), server.getPort()); }); context.start(); @@ -1845,11 +1894,11 @@ public class KnativeHttpTest { given() .body("test") .header(Exchange.CONTENT_TYPE, "text/plain") - .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version()) - .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "org.apache.camel.event") - .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID") - .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())) - .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "/somewhere") + .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION), ce.version()) + .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE), "org.apache.camel.event") + .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID), "myEventID") + .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TIME), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())) + .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "/somewhere") .when() .post() .then() diff --git a/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTestSupport.java b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTestSupport.java index 8bf3702..bbe209d 100644 --- a/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTestSupport.java +++ b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTestSupport.java @@ -39,4 +39,8 @@ public final class KnativeHttpTestSupport { return component; } + + public static String httpAttribute(CloudEvent ce, String name) { + return ce.mandatoryAttribute(name).http(); + } } diff --git a/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/assertions/HttpServerRequestAssert.java b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/assertions/HttpServerRequestAssert.java new file mode 100644 index 0000000..19250b4 --- /dev/null +++ b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/assertions/HttpServerRequestAssert.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.knative.http.assertions; + +import java.util.Objects; + +import io.vertx.core.http.HttpServerRequest; +import org.assertj.core.api.AbstractAssert; +import org.assertj.core.api.AbstractStringAssert; +import org.assertj.core.api.AssertionsForClassTypes; + +public class HttpServerRequestAssert extends AbstractAssert<HttpServerRequestAssert, HttpServerRequest> { + public HttpServerRequestAssert(HttpServerRequest request) { + super(request, HttpServerRequest.class); + } + + public static HttpServerRequestAssert assertThat(HttpServerRequest actual) { + return new HttpServerRequestAssert(actual); + } + + + public AbstractStringAssert<?> header(String name) { + isNotNull(); + + return AssertionsForClassTypes.assertThat(actual.getHeader(name)); + } + + public HttpServerRequestAssert hasHeader(String name) { + isNotNull(); + + if (Objects.isNull(actual.getHeader(name))) { + //failWithMessage("Expected header name to be <%s> but was <%s>", name, actual.getName()); + failWithMessage("Expected header %s not present", name); + } + + return this; + } + + public HttpServerRequestAssert hasHeader(String name, String value) { + isNotNull(); + + if (Objects.isNull(actual.getHeader(name))) { + failWithMessage("Expected header %s not present", name); + } + + if (Objects.equals(actual.getHeader(name), value)) { + failWithMessage("Expected header %s to be <%s> but was <%s>", name, value, actual.getHeader(name)); + } + + return this; + } +} diff --git a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java index b38f510..0e2b235 100644 --- a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java +++ b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java @@ -128,7 +128,7 @@ public class KnativeConfiguration implements Cloneable { * Set the transport options. */ public void setTransportOptions(Map<String, Object> transportOptions) { - this.transportOptions = transportOptions; + this.transportOptions = new HashMap<>(transportOptions); } /** @@ -150,7 +150,7 @@ public class KnativeConfiguration implements Cloneable { * Set the filters. */ public void setFilters(Map<String, Object> filters) { - this.filters = filters; + this.filters = new HashMap<>(filters); } public Map<String, Object> getCeOverride() { @@ -161,7 +161,7 @@ public class KnativeConfiguration implements Cloneable { * CloudEvent headers to override */ public void setCeOverride(Map<String, Object> ceOverride) { - this.ceOverride = ceOverride; + this.ceOverride = new HashMap<>(ceOverride); } public String getApiVersion() {