This is an automated email from the ASF dual-hosted git repository. lburgazzoli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-k-runtime.git
The following commit(s) were added to refs/heads/master by this push: new 6408697 Fix #536: enable knative:event endpoints without path 6408697 is described below commit 6408697df2e0ac41fa80cf958ca15f0058b97a08 Author: nicolaferraro <ni.ferr...@gmail.com> AuthorDate: Wed Dec 2 17:36:51 2020 +0100 Fix #536: enable knative:event endpoints without path --- .../component/knative/http/KnativeHttpTest.java | 82 ++++++++++++++++++++++ .../camel/component/knative/KnativeEndpoint.java | 2 +- 2 files changed, 83 insertions(+), 1 deletion(-) diff --git a/components/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java b/components/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java index 92d4c75..c225c25 100644 --- a/components/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java +++ b/components/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java @@ -1059,6 +1059,49 @@ public class KnativeHttpTest { @ParameterizedTest @EnumSource(CloudEvents.class) + void testEventsNoName(CloudEvent ce) throws Exception { + configureKnativeComponent( + context, + ce, + event( + Knative.EndpointKind.sink, + "default", + String.format("http://%s:%d", platformHttpHost, platformHttpPort), + Map.of( + Knative.CONTENT_TYPE, "text/plain" + )), + sourceEvent( + "default", + Map.of( + Knative.CONTENT_TYPE, "text/plain" + )) + ); + + RouteBuilder.addRoutes(context, b -> { + b.from("direct:source") + .to("knative:event"); + b.from("knative:event") + .to("mock:ce"); + }); + + context.start(); + + MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class); + mock.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_VERSION, ce.version()); + mock.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_TYPE, "org.apache.camel.event"); + mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain"); + mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(CloudEvent.CAMEL_CLOUD_EVENT_TIME)); + mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(CloudEvent.CAMEL_CLOUD_EVENT_ID)); + mock.expectedBodiesReceived("test"); + mock.expectedMessageCount(1); + + template.sendBody("direct:source", "test"); + + mock.assertIsSatisfied(); + } + + @ParameterizedTest + @EnumSource(CloudEvents.class) void testEventsWithResourceRef(CloudEvent ce) throws Exception { configureKnativeComponent( context, @@ -1844,6 +1887,45 @@ public class KnativeHttpTest { @ParameterizedTest @EnumSource(CloudEvents.class) + void testEventDefaultType(CloudEvent ce) throws Exception { + final KnativeHttpServer server = new KnativeHttpServer(context); + + configureKnativeComponent( + context, + ce, + event( + Knative.EndpointKind.sink, + "default", + String.format("http://%s:%d", server.getHost(), server.getPort()), + Map.of( + Knative.CONTENT_TYPE, "text/plain" + ) + ) + ); + + RouteBuilder.addRoutes(context, b -> { + b.from("direct:start") + .to("knative:event"); + }); + + context.start(); + try { + server.start(); + template.sendBody("direct:start", ""); + + HttpServerRequest request = server.poll(30, TimeUnit.SECONDS); + assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION))).isEqualTo(ce.version()); + assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE))).isEqualTo("org.apache.camel.event"); + assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID))).isNotNull(); + assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE))).isNotNull(); + assertThat(request.getHeader(Exchange.CONTENT_TYPE)).isEqualTo("text/plain"); + } finally { + server.stop(); + } + } + + @ParameterizedTest + @EnumSource(CloudEvents.class) void testSlowConsumer(CloudEvent ce) throws Exception { final KnativeHttpServer server = new KnativeHttpServer(context, event -> { event.vertx().executeBlocking( diff --git a/components/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java b/components/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java index ef668a1..0dc1b15 100644 --- a/components/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java +++ b/components/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java @@ -199,7 +199,7 @@ public class KnativeEndpoint extends DefaultEndpoint { // For event type endpoints se need to add an additional filter to filter out events received // based on the given type. // - if (resource.getType() == Knative.Type.event) { + if (resource.getType() == Knative.Type.event && ObjectHelper.isNotEmpty(resourceName)) { answer.setCloudEventType(resourceName); answer.addFilter(CloudEvent.CAMEL_CLOUD_EVENT_TYPE, resourceName); }