This is an automated email from the ASF dual-hosted git repository. nferraro pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-k-runtime.git
The following commit(s) were added to refs/heads/master by this push: new 1adf649 Can't bridge two Knative channels with new component #163 new 66ac3bb Merge pull request #166 from lburgazzoli/github-163 1adf649 is described below commit 1adf6499a95d8b573d244de451005db4f3de6027 Author: lburgazzoli <lburgazz...@gmail.com> AuthorDate: Thu Oct 17 13:08:04 2019 +0200 Can't bridge two Knative channels with new component #163 --- .../knative/http/KnativeHttpConsumer.java | 32 +-- .../component/knative/http/KnativeHttpTest.java | 266 ++++++++++++--------- .../knative/http/KnativeHttpTestSupport.java | 36 +++ 3 files changed, 210 insertions(+), 124 deletions(-) diff --git a/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumer.java b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumer.java index cb99b0d..0091dcb 100644 --- a/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumer.java +++ b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumer.java @@ -24,7 +24,6 @@ import java.util.Map; import java.util.function.Predicate; import io.vertx.core.buffer.Buffer; -import io.vertx.core.http.HttpHeaders; import io.vertx.core.http.HttpMethod; import io.vertx.core.http.HttpServerRequest; import io.vertx.core.http.HttpServerResponse; @@ -99,21 +98,24 @@ public class KnativeHttpConsumer extends DefaultConsumer implements KnativeHttp. getAsyncProcessor().process(exchange, doneSync -> { try { HttpServerResponse response = toHttpResponse(request, exchange.getMessage()); - Buffer body = computeResponseBody(exchange.getMessage()); + Buffer body = null; - // set the content type in the response. - String contentType = MessageHelper.getContentType(exchange.getMessage()); - if (contentType != null) { - // set content-type - response.putHeader(Exchange.CONTENT_TYPE, contentType); + if (request.response().getStatusCode() != 204) { + body = computeResponseBody(exchange.getMessage()); + + // set the content type in the response. + String contentType = MessageHelper.getContentType(exchange.getMessage()); + if (contentType != null) { + // set content-type + response.putHeader(Exchange.CONTENT_TYPE, contentType); + } } - if (body == null) { - request.response().setStatusCode(204); - request.response().putHeader(HttpHeaders.CONTENT_TYPE, "text/plain"); - request.response().end("No response available"); - } else { + if (body != null) { request.response().end(body); + } else { + request.response().setStatusCode(204); + request.response().end(); } } catch (Exception e) { getExceptionHandler().handleException(e); @@ -214,9 +216,9 @@ public class KnativeHttpConsumer extends DefaultConsumer implements KnativeHttp. ExchangeHelper.setFailureHandled(message.getExchange()); } - return Buffer.buffer( - message.getExchange().getContext().getTypeConverter().mandatoryConvertTo(byte[].class, body) - ); + return body != null + ? Buffer.buffer(message.getExchange().getContext().getTypeConverter().mandatoryConvertTo(byte[].class, body)) + : null; } } diff --git a/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java index cd572ec..c89e6a3 100644 --- a/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java +++ b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java @@ -22,6 +22,7 @@ import java.util.Objects; import java.util.stream.Stream; import com.fasterxml.jackson.databind.ObjectMapper; +import io.undertow.Undertow; import org.apache.camel.CamelContext; import org.apache.camel.CamelException; import org.apache.camel.Exchange; @@ -31,7 +32,6 @@ import org.apache.camel.component.knative.KnativeComponent; import org.apache.camel.component.knative.spi.CloudEvent; import org.apache.camel.component.knative.spi.CloudEvents; import org.apache.camel.component.knative.spi.Knative; -import org.apache.camel.component.knative.spi.KnativeEnvironment; import org.apache.camel.component.knative.spi.KnativeSupport; import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.http.common.HttpOperationFailedException; @@ -44,6 +44,10 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; +import static org.apache.camel.component.knative.http.KnativeHttpTestSupport.configureKnativeComponent; +import static org.apache.camel.component.knative.spi.KnativeEnvironment.channel; +import static org.apache.camel.component.knative.spi.KnativeEnvironment.endpoint; +import static org.apache.camel.component.knative.spi.KnativeEnvironment.event; import static org.assertj.core.api.Assertions.assertThat; public class KnativeHttpTest { @@ -99,8 +103,10 @@ public class KnativeHttpTest { @ParameterizedTest @MethodSource("provideCloudEventsImplementations") void testInvokeEndpoint(CloudEvent ce) throws Exception { - KnativeEnvironment env = KnativeEnvironment.on( - KnativeEnvironment.endpoint( + configureKnativeComponent( + context, + ce, + endpoint( Knative.EndpointKind.sink, "myEndpoint", "localhost", @@ -112,11 +118,6 @@ public class KnativeHttpTest { )) ); - KnativeComponent component = context.getComponent("knative", KnativeComponent.class); - component.setProtocol(Knative.Protocol.http); - component.setCloudEventsSpecVersion(ce.version()); - component.setEnvironment(env); - context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { @@ -147,8 +148,10 @@ public class KnativeHttpTest { @ParameterizedTest @MethodSource("provideCloudEventsImplementations") void testConsumeStructuredContent(CloudEvent ce) throws Exception { - KnativeEnvironment env = KnativeEnvironment.on( - KnativeEnvironment.endpoint( + configureKnativeComponent( + context, + ce, + endpoint( Knative.EndpointKind.source, "myEndpoint", "localhost", @@ -160,10 +163,6 @@ public class KnativeHttpTest { )) ); - KnativeComponent component = context.getComponent("knative", KnativeComponent.class); - component.setCloudEventsSpecVersion(ce.version()); - component.setEnvironment(env); - context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { @@ -233,8 +232,10 @@ public class KnativeHttpTest { @ParameterizedTest @MethodSource("provideCloudEventsImplementations") void testConsumeContent(CloudEvent ce) throws Exception { - KnativeEnvironment env = KnativeEnvironment.on( - KnativeEnvironment.endpoint( + configureKnativeComponent( + context, + ce, + endpoint( Knative.EndpointKind.source, "myEndpoint", "localhost", @@ -246,10 +247,6 @@ public class KnativeHttpTest { )) ); - KnativeComponent component = context.getComponent("knative", KnativeComponent.class); - component.setCloudEventsSpecVersion(ce.version()); - component.setEnvironment(env); - context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { @@ -292,8 +289,10 @@ public class KnativeHttpTest { @ParameterizedTest @MethodSource("provideCloudEventsImplementations") void testConsumeContentWithFilter(CloudEvent ce) throws Exception { - KnativeEnvironment env = KnativeEnvironment.on( - KnativeEnvironment.endpoint( + configureKnativeComponent( + context, + ce, + endpoint( Knative.EndpointKind.source, "ep1", "localhost", @@ -303,7 +302,7 @@ public class KnativeHttpTest { Knative.CONTENT_TYPE, "text/plain", Knative.KNATIVE_FILTER_PREFIX + ce.mandatoryAttribute("source").id(), "CE1" )), - KnativeEnvironment.endpoint( + endpoint( Knative.EndpointKind.source, "ep2", "localhost", @@ -315,10 +314,6 @@ public class KnativeHttpTest { )) ); - KnativeComponent component = context.getComponent("knative", KnativeComponent.class); - component.setCloudEventsSpecVersion(ce.version()); - component.setEnvironment(env); - context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { @@ -388,8 +383,10 @@ public class KnativeHttpTest { @ParameterizedTest @MethodSource("provideCloudEventsImplementations") void testConsumeContentWithRegExFilter(CloudEvent ce) throws Exception { - KnativeEnvironment env = KnativeEnvironment.on( - KnativeEnvironment.endpoint( + configureKnativeComponent( + context, + ce, + endpoint( Knative.EndpointKind.source, "ep1", "localhost", @@ -399,7 +396,7 @@ public class KnativeHttpTest { Knative.CONTENT_TYPE, "text/plain", Knative.KNATIVE_FILTER_PREFIX + ce.mandatoryAttribute("source").id(), "CE[01234]" )), - KnativeEnvironment.endpoint( + endpoint( Knative.EndpointKind.source, "ep2", "localhost", @@ -411,10 +408,6 @@ public class KnativeHttpTest { )) ); - KnativeComponent component = context.getComponent("knative", KnativeComponent.class); - component.setCloudEventsSpecVersion(ce.version()); - component.setEnvironment(env); - context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { @@ -484,18 +477,16 @@ public class KnativeHttpTest { @ParameterizedTest @MethodSource("provideCloudEventsImplementations") void testConsumeEventContent(CloudEvent ce) throws Exception { - KnativeEnvironment env = KnativeEnvironment.on( - KnativeEnvironment.event( + configureKnativeComponent( + context, + ce, + event( Knative.EndpointKind.source, "default", "localhost", port) ); - KnativeComponent component = context.getComponent("knative", KnativeComponent.class); - component.setCloudEventsSpecVersion(ce.version()); - component.setEnvironment(env); - context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { @@ -565,8 +556,10 @@ public class KnativeHttpTest { @ParameterizedTest @MethodSource("provideCloudEventsImplementations") void testReply(CloudEvent ce) throws Exception { - KnativeEnvironment env = KnativeEnvironment.on( - KnativeEnvironment.endpoint( + configureKnativeComponent( + context, + ce, + endpoint( Knative.EndpointKind.source, "from", "localhost", @@ -575,7 +568,7 @@ public class KnativeHttpTest { Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain" )), - KnativeEnvironment.endpoint( + endpoint( Knative.EndpointKind.sink, "to", "localhost", @@ -586,10 +579,6 @@ public class KnativeHttpTest { )) ); - KnativeComponent component = context.getComponent("knative", KnativeComponent.class); - component.setCloudEventsSpecVersion(ce.version()); - component.setEnvironment(env); - context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { @@ -617,10 +606,10 @@ public class KnativeHttpTest { @ParameterizedTest @MethodSource("provideCloudEventsImplementations") void testInvokeServiceWithoutHost(CloudEvent ce) throws Exception { - KnativeComponent component = context.getComponent("knative", KnativeComponent.class); - component.setCloudEventsSpecVersion(ce.version()); - component.setEnvironment(KnativeEnvironment.on( - KnativeEnvironment.endpoint( + configureKnativeComponent( + context, + ce, + endpoint( Knative.EndpointKind.sink, "test", "", @@ -630,7 +619,7 @@ public class KnativeHttpTest { Knative.CONTENT_TYPE, "text/plain" ) ) - )); + ); RouteBuilder.addRoutes(context, b -> { b.from("direct:start") @@ -649,10 +638,10 @@ public class KnativeHttpTest { @ParameterizedTest @MethodSource("provideCloudEventsImplementations") void testInvokeNotExistingEndpoint(CloudEvent ce) throws Exception { - KnativeComponent component = context.getComponent("knative", KnativeComponent.class); - component.setCloudEventsSpecVersion(ce.version()); - component.setEnvironment(KnativeEnvironment.on( - KnativeEnvironment.endpoint( + configureKnativeComponent( + context, + ce, + endpoint( Knative.EndpointKind.sink, "test", "localhost", @@ -662,7 +651,7 @@ public class KnativeHttpTest { Knative.CONTENT_TYPE, "text/plain" ) ) - )); + ); RouteBuilder.addRoutes(context, b -> { b.from("direct:start") @@ -681,10 +670,10 @@ public class KnativeHttpTest { @ParameterizedTest @MethodSource("provideCloudEventsImplementations") void testRemoveConsumer(CloudEvent ce) throws Exception { - KnativeComponent component = context.getComponent("knative", KnativeComponent.class); - component.setCloudEventsSpecVersion(ce.version()); - component.setEnvironment(KnativeEnvironment.on( - KnativeEnvironment.endpoint( + configureKnativeComponent( + context, + ce, + endpoint( Knative.EndpointKind.source, "ep1", "localhost", @@ -695,7 +684,7 @@ public class KnativeHttpTest { Knative.KNATIVE_FILTER_PREFIX + "h", "h1" ) ), - KnativeEnvironment.endpoint( + endpoint( Knative.EndpointKind.source, "ep2", "localhost", @@ -706,7 +695,7 @@ public class KnativeHttpTest { Knative.KNATIVE_FILTER_PREFIX + "h", "h2" ) ) - )); + ); RouteBuilder.addRoutes(context, b -> { b.from("knative:endpoint/ep1") @@ -738,10 +727,10 @@ public class KnativeHttpTest { @ParameterizedTest @MethodSource("provideCloudEventsImplementations") void testAddConsumer(CloudEvent ce) throws Exception { - KnativeComponent component = context.getComponent("knative", KnativeComponent.class); - component.setCloudEventsSpecVersion(ce.version()); - component.setEnvironment(KnativeEnvironment.on( - KnativeEnvironment.endpoint( + configureKnativeComponent( + context, + ce, + endpoint( Knative.EndpointKind.source, "ep1", "localhost", @@ -752,7 +741,7 @@ public class KnativeHttpTest { Knative.KNATIVE_FILTER_PREFIX + "h", "h1" ) ), - KnativeEnvironment.endpoint( + endpoint( Knative.EndpointKind.source, "ep2", "localhost", @@ -763,7 +752,7 @@ public class KnativeHttpTest { Knative.KNATIVE_FILTER_PREFIX + "h", "h2" ) ) - )); + ); RouteBuilder.addRoutes(context, b -> { b.from("knative:endpoint/ep1") @@ -797,10 +786,10 @@ public class KnativeHttpTest { @ParameterizedTest @MethodSource("provideCloudEventsImplementations") void testInvokeEndpointWithError(CloudEvent ce) throws Exception { - KnativeComponent component = context.getComponent("knative", KnativeComponent.class); - component.setCloudEventsSpecVersion(ce.version()); - component.setEnvironment(KnativeEnvironment.on( - KnativeEnvironment.endpoint( + configureKnativeComponent( + context, + ce, + endpoint( Knative.EndpointKind.sink, "ep", "localhost", @@ -810,7 +799,7 @@ public class KnativeHttpTest { Knative.CONTENT_TYPE, "text/plain" ) ) - )); + ); RouteBuilder.addRoutes(context, b -> { b.from("direct:start") @@ -835,8 +824,10 @@ public class KnativeHttpTest { @ParameterizedTest @MethodSource("provideCloudEventsImplementations") void testEvents(CloudEvent ce) throws Exception { - KnativeEnvironment env = KnativeEnvironment.on( - KnativeEnvironment.event( + configureKnativeComponent( + context, + ce, + event( Knative.EndpointKind.sink, "default", "localhost", @@ -845,7 +836,7 @@ public class KnativeHttpTest { Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain" )), - KnativeEnvironment.event( + event( Knative.EndpointKind.source, "default", "localhost", @@ -856,11 +847,6 @@ public class KnativeHttpTest { )) ); - KnativeComponent component = context.getComponent("knative", KnativeComponent.class); - component.setProtocol(Knative.Protocol.http); - component.setCloudEventsSpecVersion(ce.version()); - component.setEnvironment(env); - context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { @@ -890,8 +876,10 @@ public class KnativeHttpTest { @ParameterizedTest @MethodSource("provideCloudEventsImplementations") void testEventsWithTypeAndVersion(CloudEvent ce) throws Exception { - KnativeEnvironment env = KnativeEnvironment.on( - KnativeEnvironment.event( + configureKnativeComponent( + context, + ce, + event( Knative.EndpointKind.sink, "default", "localhost", @@ -902,7 +890,7 @@ public class KnativeHttpTest { Knative.KNATIVE_KIND, "MyObject", Knative.KNATIVE_API_VERSION, "v1" )), - KnativeEnvironment.event( + event( Knative.EndpointKind.source, "default", "localhost", @@ -915,11 +903,6 @@ public class KnativeHttpTest { )) ); - KnativeComponent component = context.getComponent("knative", KnativeComponent.class); - component.setProtocol(Knative.Protocol.http); - component.setCloudEventsSpecVersion(ce.version()); - component.setEnvironment(env); - context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { @@ -950,8 +933,10 @@ public class KnativeHttpTest { @ParameterizedTest @MethodSource("provideCloudEventsImplementations") void testConsumeContentWithTypeAndVersion(CloudEvent ce) throws Exception { - KnativeEnvironment env = KnativeEnvironment.on( - KnativeEnvironment.endpoint( + configureKnativeComponent( + context, + ce, + endpoint( Knative.EndpointKind.source, "myEndpoint", "localhost", @@ -962,7 +947,7 @@ public class KnativeHttpTest { Knative.KNATIVE_KIND, "MyObject", Knative.KNATIVE_API_VERSION, "v1" )), - KnativeEnvironment.endpoint( + endpoint( Knative.EndpointKind.source, "myEndpoint", "localhost", @@ -975,10 +960,6 @@ public class KnativeHttpTest { )) ); - KnativeComponent component = context.getComponent("knative", KnativeComponent.class); - component.setCloudEventsSpecVersion(ce.version()); - component.setEnvironment(env); - context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { @@ -1020,8 +1001,10 @@ public class KnativeHttpTest { @ParameterizedTest @MethodSource("provideCloudEventsImplementations") void testWrongMethod(CloudEvent ce) throws Exception { - KnativeEnvironment env = KnativeEnvironment.on( - KnativeEnvironment.endpoint( + configureKnativeComponent( + context, + ce, + endpoint( Knative.EndpointKind.source, "myEndpoint", "localhost", @@ -1032,10 +1015,6 @@ public class KnativeHttpTest { )) ); - KnativeComponent component = context.getComponent("knative", KnativeComponent.class); - component.setCloudEventsSpecVersion(ce.version()); - component.setEnvironment(env); - context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { @@ -1058,8 +1037,10 @@ public class KnativeHttpTest { @ParameterizedTest @MethodSource("provideCloudEventsImplementations") void testNoBody(CloudEvent ce) throws Exception { - KnativeEnvironment env = KnativeEnvironment.on( - KnativeEnvironment.endpoint( + configureKnativeComponent( + context, + ce, + endpoint( Knative.EndpointKind.sink, "myEndpoint", "localhost", @@ -1070,10 +1051,6 @@ public class KnativeHttpTest { )) ); - KnativeComponent component = context.getComponent("knative", KnativeComponent.class); - component.setCloudEventsSpecVersion(ce.version()); - component.setEnvironment(env); - context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { @@ -1089,5 +1066,76 @@ public class KnativeHttpTest { assertThat(exchange.getException()).isInstanceOf(IllegalArgumentException.class); assertThat(exchange.getException()).hasMessage("body must not be null"); } + + + + @ParameterizedTest + @MethodSource("provideCloudEventsImplementations") + void testNoContent(CloudEvent ce) throws Exception { + final int messagesPort = AvailablePortFinder.getNextAvailable(); + final int wordsPort = AvailablePortFinder.getNextAvailable(); + + configureKnativeComponent( + context, + ce, + channel( + Knative.EndpointKind.source, + "messages", + "localhost", + messagesPort, + KnativeSupport.mapOf( + Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", + Knative.CONTENT_TYPE, "text/plain" + )), + channel( + Knative.EndpointKind.sink, + "messages", + "localhost", + messagesPort, + KnativeSupport.mapOf( + Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", + Knative.CONTENT_TYPE, "text/plain" + )), + channel( + Knative.EndpointKind.sink, + "words", + "localhost", + wordsPort, + KnativeSupport.mapOf( + Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", + Knative.CONTENT_TYPE, "text/plain" + )) + ); + + Undertow server = Undertow.builder() + .addHttpListener(wordsPort, "localhost") + .setHandler(exchange -> { + exchange.setStatusCode(204); + exchange.getResponseSender().send(""); + }) + .build(); + + try { + server.start(); + + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("knative:channel/messages") + .transform().simple("transformed ${body}") + .log("${body}") + .to("knative:channel/words"); + } + }); + + context.start(); + + Exchange exchange = template.request("knative:channel/messages", e -> e.getMessage().setBody("message")); + assertThat(exchange.getMessage().getHeaders()).containsEntry(Exchange.HTTP_RESPONSE_CODE, 204); + assertThat(exchange.getMessage().getBody()).isNull(); + } finally { + server.stop(); + } + } } diff --git a/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTestSupport.java b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTestSupport.java new file mode 100644 index 0000000..273d5fb --- /dev/null +++ b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTestSupport.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.knative.http; + +import org.apache.camel.CamelContext; +import org.apache.camel.component.knative.KnativeComponent; +import org.apache.camel.component.knative.spi.CloudEvent; +import org.apache.camel.component.knative.spi.KnativeEnvironment; + +public final class KnativeHttpTestSupport { + private KnativeHttpTestSupport() { + } + + public static KnativeComponent configureKnativeComponent(CamelContext context, CloudEvent ce, KnativeEnvironment.KnativeServiceDefinition... definitions) { + KnativeEnvironment env = KnativeEnvironment.on(definitions); + KnativeComponent component = context.getComponent("knative", KnativeComponent.class); + component.setCloudEventsSpecVersion(ce.version()); + component.setEnvironment(env); + + return component; + } +}