This is an automated email from the ASF dual-hosted git repository. lburgazzoli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-k-runtime.git
commit 359c8e1ccd4b93e66df092b02e67bd52a3cdd9ab Author: lburgazzoli <lburgazz...@gmail.com> AuthorDate: Sun Jun 21 11:09:47 2020 +0200 camel-knative: add support for lookup service definition from the registry #371 --- .../component/knative/spi/KnativeEnvironment.java | 15 +++-- .../camel/component/knative/KnativeEndpoint.java | 67 +++++++++++++--------- .../component/knative/KnativeComponentTest.java | 10 ++++ 3 files changed, 59 insertions(+), 33 deletions(-) diff --git a/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeEnvironment.java b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeEnvironment.java index ee43bbe..76af6de 100644 --- a/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeEnvironment.java +++ b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeEnvironment.java @@ -53,11 +53,7 @@ public class KnativeEnvironment { } public Stream<KnativeServiceDefinition> lookup(Knative.Type type, String name) { - return services.stream() - .filter(definition -> { - return Objects.equals(type.name(), definition.getMetadata().get(Knative.KNATIVE_TYPE)) - && Objects.equals(name, definition.getName()); - }); + return stream().filter(definition -> definition.matches(type, name)); } // ************************ @@ -267,5 +263,14 @@ public class KnativeEnvironment { public int getPortOrDefault(int port) { return getPort() != -1 ? getPort() : port; } + + public String getMetadata(String key) { + return getMetadata().get(key); + } + + public boolean matches(Knative.Type type, String name) { + return Objects.equals(type.name(), getMetadata(Knative.KNATIVE_TYPE)) + && Objects.equals(name, getName()); + } } } diff --git a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java index ed6674b..4b989bf 100644 --- a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java +++ b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java @@ -20,6 +20,8 @@ import java.util.HashMap; import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.function.Predicate; +import java.util.stream.Stream; import org.apache.camel.Consumer; import org.apache.camel.Processor; @@ -51,12 +53,10 @@ public class KnativeEndpoint extends DefaultEndpoint { private final Knative.Type type; @UriPath(description = "The Knative name") private final String name; - + private final CloudEventProcessor cloudEvent; @UriParam private KnativeConfiguration configuration; - private final CloudEventProcessor cloudEvent; - public KnativeEndpoint(String uri, KnativeComponent component, Knative.Type type, String name, KnativeConfiguration configuration) { super(uri, component); @@ -120,14 +120,14 @@ public class KnativeEndpoint extends DefaultEndpoint { return name; } - public void setConfiguration(KnativeConfiguration configuration) { - this.configuration = configuration; - } - public KnativeConfiguration getConfiguration() { return configuration; } + public void setConfiguration(KnativeConfiguration configuration) { + this.configuration = configuration; + } + KnativeEnvironment.KnativeServiceDefinition lookupServiceDefinition(Knative.EndpointKind endpointKind) { String serviceName = configuration.getServiceName(); @@ -152,7 +152,7 @@ public class KnativeEndpoint extends DefaultEndpoint { key = Knative.KNATIVE_FILTER_PREFIX + key; } - metadata.put(key, (String)val); + metadata.put(key, (String) val); } } @@ -165,7 +165,7 @@ public class KnativeEndpoint extends DefaultEndpoint { key = Knative.KNATIVE_CE_OVERRIDE_PREFIX + key; } - metadata.put(key, (String)val); + metadata.put(key, (String) val); } } @@ -184,25 +184,9 @@ public class KnativeEndpoint extends DefaultEndpoint { } Optional<KnativeEnvironment.KnativeServiceDefinition> lookupServiceDefinition(String name, Knative.EndpointKind endpointKind) { - return this.configuration.getEnvironment() - .lookup(this.type, name) - .filter(s -> { - final String type = s.getMetadata().get(Knative.CAMEL_ENDPOINT_KIND); - final String apiv = s.getMetadata().get(Knative.KNATIVE_API_VERSION); - final String kind = s.getMetadata().get(Knative.KNATIVE_KIND); - - if (!Objects.equals(endpointKind.name(), type)) { - return false; - } - if (configuration.getApiVersion() != null && !Objects.equals(apiv, configuration.getApiVersion())) { - return false; - } - if (configuration.getKind() != null && !Objects.equals(kind, configuration.getKind())) { - return false; - } - - return true; - }) + return servicesDefinitions() + .filter(definition -> definition.matches(this.type, name)) + .filter(serviceFilter(endpointKind)) .findFirst(); } @@ -212,4 +196,31 @@ public class KnativeEndpoint extends DefaultEndpoint { !this.configuration.isReplyWithCloudEvent() ); } + + private Stream<KnativeEnvironment.KnativeServiceDefinition> servicesDefinitions() { + return Stream.concat( + getCamelContext().getRegistry().findByType(KnativeEnvironment.KnativeServiceDefinition.class).stream(), + this.configuration.getEnvironment().stream() + ); + } + + private Predicate<KnativeEnvironment.KnativeServiceDefinition> serviceFilter(Knative.EndpointKind endpointKind) { + return s -> { + final String type = s.getMetadata(Knative.CAMEL_ENDPOINT_KIND); + final String apiv = s.getMetadata(Knative.KNATIVE_API_VERSION); + final String kind = s.getMetadata(Knative.KNATIVE_KIND); + + if (!Objects.equals(endpointKind.name(), type)) { + return false; + } + if (configuration.getApiVersion() != null && !Objects.equals(apiv, configuration.getApiVersion())) { + return false; + } + if (configuration.getKind() != null && !Objects.equals(kind, configuration.getKind())) { + return false; + } + + return true; + }; + } } diff --git a/camel-knative/camel-knative/src/test/java/org/apache/camel/component/knative/KnativeComponentTest.java b/camel-knative/camel-knative/src/test/java/org/apache/camel/component/knative/KnativeComponentTest.java index 7a1870c..ac6ed7a 100644 --- a/camel-knative/camel-knative/src/test/java/org/apache/camel/component/knative/KnativeComponentTest.java +++ b/camel-knative/camel-knative/src/test/java/org/apache/camel/component/knative/KnativeComponentTest.java @@ -67,6 +67,8 @@ public class KnativeComponentTest { component.setEnvironment(env); component.setTransport(new KnativeTransportNoop()); + context.getRegistry().bind("ereg", KnativeEnvironment.endpoint(Knative.EndpointKind.source, "ereg", null, -1)); + context.getRegistry().bind("creg", KnativeEnvironment.channel(Knative.EndpointKind.source, "creg", null, -1)); context.addComponent("knative", component); // @@ -77,6 +79,10 @@ public class KnativeComponentTest { assertThat(endpoint.lookupServiceDefinition("c1", Knative.EndpointKind.source)).isPresent(); assertThat(endpoint.lookupServiceDefinition("e1", Knative.EndpointKind.source)).isNotPresent(); } + { + KnativeEndpoint endpoint = context.getEndpoint("knative:channel/creg", KnativeEndpoint.class); + assertThat(endpoint.lookupServiceDefinition("creg", Knative.EndpointKind.source)).isPresent(); + } // // Endpoints @@ -86,5 +92,9 @@ public class KnativeComponentTest { assertThat(endpoint.lookupServiceDefinition("e1", Knative.EndpointKind.source)).isPresent(); assertThat(endpoint.lookupServiceDefinition("c1", Knative.EndpointKind.source)).isNotPresent(); } + { + KnativeEndpoint endpoint = context.getEndpoint("knative:endpoint/ereg", KnativeEndpoint.class); + assertThat(endpoint.lookupServiceDefinition("ereg", Knative.EndpointKind.source)).isPresent(); + } } }