CAMEL-9683: A new toService EIP that uses a client discovery to lookup alive services and pick a service ip/port to use when calling the service from Camel route. Allows to plugin different providers.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/cb7c101b Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/cb7c101b Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/cb7c101b Branch: refs/heads/remoteServiceCall Commit: cb7c101b40cf6a3c75faa60f487cd97423cac65f Parents: e46a236 Author: Claus Ibsen <davscl...@apache.org> Authored: Fri Apr 22 15:08:06 2016 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Mon May 23 09:22:31 2016 +0200 ---------------------------------------------------------------------- .../apache/camel/model/ProcessorDefinition.java | 3 +- .../model/ToServiceConfigurationDefinition.java | 18 +++++++ .../kubernetes/KubernetesConfiguration.java | 31 ++++++++--- .../consumer/KubernetesNamespacesConsumer.java | 4 +- .../consumer/KubernetesPodsConsumer.java | 4 +- ...ubernetesReplicationControllersConsumer.java | 4 +- .../consumer/KubernetesSecretsConsumer.java | 4 +- .../consumer/KubernetesServicesConsumer.java | 4 +- .../processor/KubernetesProcessorFactory.java | 5 ++ .../processor/KubernetesServiceDiscovery.java | 1 + .../processor/KubernetesServiceProcessor.java | 21 ++++++-- .../processor/ToServiceRouteTest.java | 54 ++++++++++++++++++++ .../src/test/resources/log4j.properties | 4 +- 13 files changed, 133 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/cb7c101b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java index 2acf785..6c0e8f9 100644 --- a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java @@ -661,10 +661,11 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> * @return the builder */ @SuppressWarnings("unchecked") - public Type toService(String name, String uri) { + public Type toService(String name, String uri, ToServiceConfigurationDefinition configuration) { ToServiceDefinition answer = new ToServiceDefinition(); answer.setName(name); answer.setUri(uri); + answer.setToServiceConfiguration(configuration); addOutput(answer); return (Type) this; } http://git-wip-us.apache.org/repos/asf/camel/blob/cb7c101b/camel-core/src/main/java/org/apache/camel/model/ToServiceConfigurationDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/ToServiceConfigurationDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ToServiceConfigurationDefinition.java index 53c63f2..f42205e 100644 --- a/camel-core/src/main/java/org/apache/camel/model/ToServiceConfigurationDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/ToServiceConfigurationDefinition.java @@ -34,6 +34,8 @@ public class ToServiceConfigurationDefinition extends IdentifiedType { @XmlAttribute @Metadata(required = "true") private String masterUrl; @XmlAttribute + private String namespace; + @XmlAttribute private String apiVersion; @XmlAttribute @Metadata(label = "security") private String username; @@ -78,6 +80,14 @@ public class ToServiceConfigurationDefinition extends IdentifiedType { this.masterUrl = masterUrl; } + public String getNamespace() { + return namespace; + } + + public void setNamespace(String namespace) { + this.namespace = namespace; + } + public String getApiVersion() { return apiVersion; } @@ -195,6 +205,14 @@ public class ToServiceConfigurationDefinition extends IdentifiedType { } /** + * Sets the namespace to use + */ + public ToServiceConfigurationDefinition namespace(String namespace) { + setNamespace(namespace); + return this; + } + + /** * Sets the API version */ public ToServiceConfigurationDefinition apiVersion(String apiVersion) { http://git-wip-us.apache.org/repos/asf/camel/blob/cb7c101b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesConfiguration.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesConfiguration.java index bd965ac..de35463 100644 --- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesConfiguration.java +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesConfiguration.java @@ -86,7 +86,7 @@ public class KubernetesConfiguration { private Boolean trustCerts; @UriParam(label = "consumer") - private String namespaceName; + private String namespace; @UriParam(label = "consumer", defaultValue = "1") private int poolSize = 1; @@ -279,16 +279,33 @@ public class KubernetesConfiguration { } /** - * The namespace name + * The namespace */ + public String getNamespace() { + return namespace; + } + + public void setNamespace(String namespace) { + this.namespace = namespace; + } + + + /** + * @deprecated use {@link #getNamespace()} + */ + @Deprecated public String getNamespaceName() { - return namespaceName; + return getNamespace(); } - public void setNamespaceName(String namespaceName) { - this.namespaceName = namespaceName; + /** + * @deprecated use {@link #setNamespace(String)} + */ + @Deprecated + public void setNamespaceName(String namespace) { + setNamespace(namespace); } - + /** * The Consumer pool size */ @@ -308,6 +325,6 @@ public class KubernetesConfiguration { + ", clientCertData=" + clientCertData + ", clientCertFile=" + clientCertFile + ", clientKeyAlgo=" + clientKeyAlgo + ", clientKeyData=" + clientKeyData + ", clientKeyFile=" + clientKeyFile + ", clientKeyPassphrase=" + clientKeyPassphrase + ", oauthToken=" + oauthToken + ", trustCerts=" - + trustCerts + ", namespaceName=" + namespaceName + "]"; + + trustCerts + ", namespaceName=" + namespace + "]"; } } http://git-wip-us.apache.org/repos/asf/camel/blob/cb7c101b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesNamespacesConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesNamespacesConsumer.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesNamespacesConsumer.java index e3d6d08..4786ede 100644 --- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesNamespacesConsumer.java +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesNamespacesConsumer.java @@ -77,9 +77,9 @@ public class KubernetesNamespacesConsumer extends DefaultConsumer { @Override public void run() { if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getOauthToken())) { - if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getNamespaceName())) { + if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getNamespace())) { getEndpoint().getKubernetesClient().namespaces() - .withName(getEndpoint().getKubernetesConfiguration().getNamespaceName()) + .withName(getEndpoint().getKubernetesConfiguration().getNamespace()) .watch(new Watcher<Namespace>() { @Override http://git-wip-us.apache.org/repos/asf/camel/blob/cb7c101b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesPodsConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesPodsConsumer.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesPodsConsumer.java index b92d05d..727d7f4 100644 --- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesPodsConsumer.java +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesPodsConsumer.java @@ -77,9 +77,9 @@ public class KubernetesPodsConsumer extends DefaultConsumer { @Override public void run() { if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getOauthToken())) { - if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getNamespaceName())) { + if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getNamespace())) { getEndpoint().getKubernetesClient().pods() - .inNamespace(getEndpoint().getKubernetesConfiguration().getNamespaceName()) + .inNamespace(getEndpoint().getKubernetesConfiguration().getNamespace()) .watch(new Watcher<Pod>() { @Override http://git-wip-us.apache.org/repos/asf/camel/blob/cb7c101b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesReplicationControllersConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesReplicationControllersConsumer.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesReplicationControllersConsumer.java index da577b7..03f9be3 100644 --- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesReplicationControllersConsumer.java +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesReplicationControllersConsumer.java @@ -78,9 +78,9 @@ public class KubernetesReplicationControllersConsumer extends DefaultConsumer { @Override public void run() { if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getOauthToken())) { - if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getNamespaceName())) { + if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getNamespace())) { getEndpoint().getKubernetesClient().replicationControllers() - .inNamespace(getEndpoint().getKubernetesConfiguration().getNamespaceName()) + .inNamespace(getEndpoint().getKubernetesConfiguration().getNamespace()) .watch(new Watcher<ReplicationController>() { @Override http://git-wip-us.apache.org/repos/asf/camel/blob/cb7c101b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesSecretsConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesSecretsConsumer.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesSecretsConsumer.java index daadb11..a677945 100644 --- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesSecretsConsumer.java +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesSecretsConsumer.java @@ -78,9 +78,9 @@ public class KubernetesSecretsConsumer extends DefaultConsumer { @Override public void run() { if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getOauthToken())) { - if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getNamespaceName())) { + if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getNamespace())) { getEndpoint().getKubernetesClient().secrets() - .inNamespace(getEndpoint().getKubernetesConfiguration().getNamespaceName()) + .inNamespace(getEndpoint().getKubernetesConfiguration().getNamespace()) .watch(new Watcher<Secret>() { @Override http://git-wip-us.apache.org/repos/asf/camel/blob/cb7c101b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesServicesConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesServicesConsumer.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesServicesConsumer.java index d8b28ae..b51746f 100644 --- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesServicesConsumer.java +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesServicesConsumer.java @@ -77,9 +77,9 @@ public class KubernetesServicesConsumer extends DefaultConsumer { @Override public void run() { if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getOauthToken())) { - if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getNamespaceName())) { + if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getNamespace())) { getEndpoint().getKubernetesClient().services() - .inNamespace(getEndpoint().getKubernetesConfiguration().getNamespaceName()) + .inNamespace(getEndpoint().getKubernetesConfiguration().getNamespace()) .watch(new Watcher<Service>() { @Override http://git-wip-us.apache.org/repos/asf/camel/blob/cb7c101b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesProcessorFactory.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesProcessorFactory.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesProcessorFactory.java index f81c08d..9ee2ca9 100644 --- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesProcessorFactory.java +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesProcessorFactory.java @@ -71,6 +71,11 @@ public class KubernetesProcessorFactory implements ProcessorFactory { KubernetesConfiguration kc = new KubernetesConfiguration(); IntrospectionSupport.setProperties(kc, parameters); + // use namespace from config if not provided + if (namespace == null) { + namespace = kc.getNamespace(); + } + return new KubernetesServiceProcessor(name, namespace, uri, mep, kc); } else { return null; http://git-wip-us.apache.org/repos/asf/camel/blob/cb7c101b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceDiscovery.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceDiscovery.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceDiscovery.java index 420cdc3..574db72 100644 --- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceDiscovery.java +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceDiscovery.java @@ -48,6 +48,7 @@ public class KubernetesServiceDiscovery extends ServiceSupport { } public List<Server> getUpdatedListOfServers() { + LOG.debug("Discovering endpoints from namespace: {} with name: {}", namespace, name); Endpoints endpoints = client.endpoints().inNamespace(namespace).withName(name).get(); List<Server> result = new ArrayList<Server>(); if (endpoints != null) { http://git-wip-us.apache.org/repos/asf/camel/blob/cb7c101b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceProcessor.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceProcessor.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceProcessor.java index 3c9c231..4307960 100644 --- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceProcessor.java +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceProcessor.java @@ -70,10 +70,19 @@ public class KubernetesServiceProcessor extends ServiceSupport implements AsyncP public boolean process(Exchange exchange, AsyncCallback callback) { // TODO: in try .. catch and the callback stuff - List<Server> services = discovery.getUpdatedListOfServers(); - // apply strategy to pick a service - if (services.isEmpty()) { - exchange.setException(new RejectedExecutionException("No active services with name " + name + " in namespace " + namespace)); + List<Server> services = null; + try { + services = discovery.getUpdatedListOfServers(); + if (services == null || services.isEmpty()) { + exchange.setException(new RejectedExecutionException("No active services with name " + name + " in namespace " + namespace)); + } + } catch (Throwable e) { + exchange.setException(e); + } + + if (exchange.getException() != null) { + callback.done(true); + return true; } // what strategy to use? random @@ -113,6 +122,10 @@ public class KubernetesServiceProcessor extends ServiceSupport implements AsyncP @Override protected void doStart() throws Exception { + ObjectHelper.notEmpty(name, "name", this); + ObjectHelper.notEmpty(namespace, "namespace", this); + ObjectHelper.notEmpty(configuration.getMasterUrl(), "masterUrl", this); + discovery = new KubernetesServiceDiscovery(name, namespace, null, createKubernetesClient()); ServiceHelper.startService(discovery); } http://git-wip-us.apache.org/repos/asf/camel/blob/cb7c101b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/processor/ToServiceRouteTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/processor/ToServiceRouteTest.java b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/processor/ToServiceRouteTest.java new file mode 100644 index 0000000..a7d51fc --- /dev/null +++ b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/processor/ToServiceRouteTest.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kubernetes.processor; + +import org.apache.camel.RoutesBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.model.ToServiceConfigurationDefinition; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +public class ToServiceRouteTest extends CamelTestSupport { + + @Test + public void testToService() throws Exception { + getMockEndpoint("mock:result").expectedMessageCount(1); + + template.sendBody("direct:start", "Hello World"); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RoutesBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + ToServiceConfigurationDefinition config = new ToServiceConfigurationDefinition(); + config.setMasterUrl("https://fabric8-master.vagrant.f8:8443"); + config.setUsername("admin"); + config.setPassword("admin"); +// config.setOauthToken("eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJrdWJlcm5ldGVzL3NlcnZpY2VhY2NvdW50Iiwia3ViZXJuZXRlcy5pby9zZXJ2aWNlYWNjb3VudC9uYW1lc3BhY2UiOiJkZWZhdWx0Iiwia3ViZXJuZXRlcy5pby9zZXJ2aWNlYWNjb3VudC9zZWNyZXQubmFtZSI6ImZhYnJpYzgtdG9rZW4tZzNsdGoiLCJrdWJlcm5ldGVzLmlvL3NlcnZpY2VhY2NvdW50L3NlcnZpY2UtYWNjb3VudC5uYW1lIjoiZmFicmljOCIsImt1YmVybmV0ZXMuaW8vc2VydmljZWFjY291bnQvc2VydmljZS1hY2NvdW50LnVpZCI6ImU0NGJhYzA0LWZmYjQtMTFlNS05MWM0LTA4MDAyN2I1YzJmNCIsInN1YiI6InN5c3RlbTpzZXJ2aWNlYWNjb3VudDpkZWZhdWx0OmZhYnJpYzgifQ.yqhevtuqliAV7RlRhaSjG8oFSOn2V1vfmj5V9JKpaOCWbWXMYS0y_v4QPfI4vIGsJtpZgasrt-8brkiOkq7zx0BJxVm-Ae5QIE1uJNeWFYcno823SUV2ebHykhp0eUEtCmWtHByBIoTTF8dG3NZ6jWow7KVGN289Y2ryi8QoYupfQ9ABddVVcduolStIqBu3pu-dJqIvlt6L8wE6AHfhS4uSaPwcimbs5hrg6gB_iONCSCSayhOyiT6fNlXdpxndRRBg9MP3X3f4dD3kDyHE0860HzqZ05jFIwGfV_rbFJeNY3SLDQNO_QFXqUZKg01OH-OJaqDSjuV48P9b6n4uHA"); + config.setNamespace("default"); + + from("direct:start") + .toService("cdi-camel-jetty", "http:cdi-camel-jetty", config) + .to("mock:result"); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/cb7c101b/components/camel-kubernetes/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/test/resources/log4j.properties b/components/camel-kubernetes/src/test/resources/log4j.properties index 255fe5c..767860e 100644 --- a/components/camel-kubernetes/src/test/resources/log4j.properties +++ b/components/camel-kubernetes/src/test/resources/log4j.properties @@ -18,9 +18,9 @@ # # The logging properties used # -log4j.rootLogger=INFO, file +log4j.rootLogger=INFO, out -#log4j.logger.org.apache.camel=DEBUG +log4j.logger.org.apache.camel.component.kubernetes=DEBUG # CONSOLE appender not used by default log4j.appender.out=org.apache.log4j.ConsoleAppender