Repository: camel Updated Branches: refs/heads/kube-lb [created] 14786f68e
CAMEL-9683: A new toService EIP that uses a client discovery to lookup alive services and pick a service ip/port to use when calling the service from Camel route. Allows to plugin different providers. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/5bb0ba9d Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/5bb0ba9d Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/5bb0ba9d Branch: refs/heads/kube-lb Commit: 5bb0ba9d9ee3ed5c12c7177c1176c09b25785f8d Parents: 3eb94f4 Author: Claus Ibsen <davscl...@apache.org> Authored: Fri Apr 22 14:14:49 2016 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Fri Apr 22 14:14:49 2016 +0200 ---------------------------------------------------------------------- .../apache/camel/model/ProcessorDefinition.java | 16 + .../model/ToServiceConfigurationDefinition.java | 308 +++++++++++++++++++ .../apache/camel/model/ToServiceDefinition.java | 201 ++++++++++++ .../resources/org/apache/camel/model/jaxb.index | 2 + .../kubernetes/KubernetesEndpoint.java | 21 +- .../processor/KubernetesProcessorFactory.java | 80 +++++ .../processor/KubernetesServiceDiscovery.java | 90 ++++++ .../processor/KubernetesServiceProcessor.java | 173 +++++++++++ .../component/kubernetes/processor/Server.java | 36 +++ .../org/apache/camel/model/ToServiceDefinition | 18 ++ 10 files changed, 933 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/5bb0ba9d/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java index 2270ef3..2acf785 100644 --- a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java @@ -654,6 +654,22 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> } /** + * Sends the exchange to the given service + * + * @param name the service name + * @param uri the endpoint uri to use for calling the service + * @return the builder + */ + @SuppressWarnings("unchecked") + public Type toService(String name, String uri) { + ToServiceDefinition answer = new ToServiceDefinition(); + answer.setName(name); + answer.setUri(uri); + addOutput(answer); + return (Type) this; + } + + /** * Sends the exchange to the given endpoint * * @param endpoint the endpoint to send to http://git-wip-us.apache.org/repos/asf/camel/blob/5bb0ba9d/camel-core/src/main/java/org/apache/camel/model/ToServiceConfigurationDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/ToServiceConfigurationDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ToServiceConfigurationDefinition.java new file mode 100644 index 0000000..53c63f2 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/model/ToServiceConfigurationDefinition.java @@ -0,0 +1,308 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.model; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlAttribute; +import javax.xml.bind.annotation.XmlRootElement; +import javax.xml.bind.annotation.XmlTransient; + +import org.apache.camel.spi.Metadata; + +@Metadata(label = "eip,routing") +@XmlRootElement(name = "toServiceConfiguration") +@XmlAccessorType(XmlAccessType.FIELD) +public class ToServiceConfigurationDefinition extends IdentifiedType { + + @XmlTransient + private ToServiceDefinition parent; + @XmlAttribute @Metadata(required = "true") + private String masterUrl; + @XmlAttribute + private String apiVersion; + @XmlAttribute @Metadata(label = "security") + private String username; + @XmlAttribute @Metadata(label = "security") + private String password; + @XmlAttribute @Metadata(label = "security") + private String oauthToken; + @XmlAttribute @Metadata(label = "security") + private String caCertData; + @XmlAttribute @Metadata(label = "security") + private String caCertFile; + @XmlAttribute @Metadata(label = "security") + private String clientCertData; + @XmlAttribute @Metadata(label = "security") + private String clientCertFile; + @XmlAttribute @Metadata(label = "security") + private String clientKeyAlgo; + @XmlAttribute @Metadata(label = "security") + private String clientKeyData; + @XmlAttribute @Metadata(label = "security") + private String clientKeyFile; + @XmlAttribute @Metadata(label = "security") + private String clientKeyPassphrase; + @XmlAttribute @Metadata(label = "security") + private Boolean trustCerts; + + public ToServiceConfigurationDefinition() { + } + + public ToServiceConfigurationDefinition(ToServiceDefinition parent) { + this.parent = parent; + } + + // Getter/Setter + // ------------------------------------------------------------------------- + + public String getMasterUrl() { + return masterUrl; + } + + public void setMasterUrl(String masterUrl) { + this.masterUrl = masterUrl; + } + + public String getApiVersion() { + return apiVersion; + } + + public void setApiVersion(String apiVersion) { + this.apiVersion = apiVersion; + } + + public String getUsername() { + return username; + } + + public void setUsername(String username) { + this.username = username; + } + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + + public String getCaCertData() { + return caCertData; + } + + public void setCaCertData(String caCertData) { + this.caCertData = caCertData; + } + + public String getCaCertFile() { + return caCertFile; + } + + public void setCaCertFile(String caCertFile) { + this.caCertFile = caCertFile; + } + + public String getClientCertData() { + return clientCertData; + } + + public void setClientCertData(String clientCertData) { + this.clientCertData = clientCertData; + } + + public String getClientCertFile() { + return clientCertFile; + } + + public void setClientCertFile(String clientCertFile) { + this.clientCertFile = clientCertFile; + } + + public String getClientKeyAlgo() { + return clientKeyAlgo; + } + + public void setClientKeyAlgo(String clientKeyAlgo) { + this.clientKeyAlgo = clientKeyAlgo; + } + + public String getClientKeyData() { + return clientKeyData; + } + + public void setClientKeyData(String clientKeyData) { + this.clientKeyData = clientKeyData; + } + + public String getClientKeyFile() { + return clientKeyFile; + } + + public void setClientKeyFile(String clientKeyFile) { + this.clientKeyFile = clientKeyFile; + } + + public String getClientKeyPassphrase() { + return clientKeyPassphrase; + } + + public void setClientKeyPassphrase(String clientKeyPassphrase) { + this.clientKeyPassphrase = clientKeyPassphrase; + } + + public String getOauthToken() { + return oauthToken; + } + + public void setOauthToken(String oauthToken) { + this.oauthToken = oauthToken; + } + + public Boolean getTrustCerts() { + return trustCerts; + } + + public void setTrustCerts(Boolean trustCerts) { + this.trustCerts = trustCerts; + } + + + // Fluent API + // ------------------------------------------------------------------------- + + /** + * Sets the URL to the master + */ + public ToServiceConfigurationDefinition masterUrl(String masterUrl) { + setMasterUrl(masterUrl); + return this; + } + + /** + * Sets the API version + */ + public ToServiceConfigurationDefinition apiVersion(String apiVersion) { + setApiVersion(apiVersion); + return this; + } + + /** + * Sets the username for authentication + */ + public ToServiceConfigurationDefinition username(String username) { + setUsername(username); + return this; + } + + /** + * Sets the password for authentication + */ + public ToServiceConfigurationDefinition password(String password) { + setPassword(password); + return this; + } + + /** + * Sets the OAUTH token for authentication (instead of username/password) + */ + public ToServiceConfigurationDefinition oauthToken(String oauthToken) { + setOauthToken(oauthToken); + return this; + } + + /** + * Sets the Certificate Authority data + */ + public ToServiceConfigurationDefinition caCertData(String caCertData) { + setCaCertData(caCertData); + return this; + } + + /** + * Sets the Certificate Authority data that are loaded from the file + */ + public ToServiceConfigurationDefinition caCertFile(String caCertFile) { + setCaCertFile(caCertFile); + return this; + } + + /** + * Sets the Client Certificate data + */ + public ToServiceConfigurationDefinition clientCertData(String clientCertData) { + setClientCertData(clientCertData); + return this; + } + + /** + * Sets the Client Certificate data that are loaded from the file + */ + public ToServiceConfigurationDefinition clientCertFile(String clientCertFile) { + setClientCertFile(clientCertFile); + return this; + } + + /** + * Sets the Client Keystore algorithm, such as RSA. + */ + public ToServiceConfigurationDefinition clientKeyAlgo(String clientKeyAlgo) { + setClientKeyAlgo(clientKeyAlgo); + return this; + } + + /** + * Sets the Client Keystore data + */ + public ToServiceConfigurationDefinition clientKeyData(String clientKeyData) { + setClientKeyData(clientKeyData); + return this; + } + + /** + * Sets the Client Keystore data that are loaded from the file + */ + public ToServiceConfigurationDefinition clientKeyFile(String clientKeyFile) { + setClientKeyFile(clientKeyFile); + return this; + } + + /** + * Sets the Client Keystore passphrase + */ + public ToServiceConfigurationDefinition clientKeyPassphrase(String clientKeyPassphrase) { + setClientKeyPassphrase(clientKeyPassphrase); + return this; + } + + /** + * Sets whether to turn on trust certificate check + */ + public ToServiceConfigurationDefinition trustCerts(boolean trustCerts) { + setTrustCerts(trustCerts); + return this; + } + + /** + * End of configuration + */ + public ToServiceDefinition end() { + return parent; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/5bb0ba9d/camel-core/src/main/java/org/apache/camel/model/ToServiceDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/ToServiceDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ToServiceDefinition.java new file mode 100644 index 0000000..1c5dfc4 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/model/ToServiceDefinition.java @@ -0,0 +1,201 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.model; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlAttribute; +import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlRootElement; + +import org.apache.camel.ExchangePattern; +import org.apache.camel.Processor; +import org.apache.camel.spi.Metadata; +import org.apache.camel.spi.RouteContext; + +@Metadata(label = "eip,routing") +@XmlRootElement(name = "toService") +@XmlAccessorType(XmlAccessType.FIELD) +public class ToServiceDefinition extends NoOutputDefinition<ToServiceDefinition> { + + // TODO: load balancing strategy + + @XmlElement + private ToServiceConfigurationDefinition toServiceConfiguration; + @XmlAttribute @Metadata(required = "true") + private String uri; + @XmlAttribute + private ExchangePattern pattern; + @XmlAttribute @Metadata(defaultValue = "default") + private String namespace; + @XmlAttribute @Metadata(required = "true") + private String name; + @XmlAttribute + private String discovery; + @XmlAttribute + private String toServiceConfigurationRef; + + public ToServiceDefinition() { + } + + // toService("myService") (will use http by default) + // toService("myService/foo") (will use http by default) + // toService("http:myService/foo") + // toService("myService", "http:myService.host:myService.port/foo") + // toService("myService", "netty4:tcp:myService?connectTimeout=1000") + + @Override + public String toString() { + return "ToService[" + name + "]"; + } + + @Override + public String getLabel() { + return "toService"; + } + + @Override + public Processor createProcessor(RouteContext routeContext) throws Exception { + if (discovery != null) { + throw new IllegalStateException("Cannot find Camel component on the classpath implementing the discovery provider: " + discovery); + } else { + throw new IllegalStateException("Cannot find Camel component supporting the ToService EIP. Add camel-kubernetes if you are using Kubernetes."); + } + } + + // Fluent API + // ------------------------------------------------------------------------- + + /** + * Sets the optional {@link ExchangePattern} used to invoke this endpoint + */ + public ToServiceDefinition pattern(ExchangePattern pattern) { + setPattern(pattern); + return this; + } + + /** + * Sets the namespace of the service to use + */ + public ToServiceDefinition namespace(String namespace) { + setNamespace(namespace); + return this; + } + + /** + * Sets the name of the service to use + */ + public ToServiceDefinition name(String name) { + setName(name); + return this; + } + + /** + * Sets the discovery provided to use. + * <p/> + * Use kubernetes to use kubernetes. + * Use ribbon to use ribbon. + */ + public ToServiceDefinition discovery(String discovery) { + setDiscovery(discovery); + return this; + } + + public ToServiceConfigurationDefinition toServiceConfiguration() { + toServiceConfiguration = new ToServiceConfigurationDefinition(this); + return toServiceConfiguration; + } + + /** + * Configures the Hystrix EIP using the given configuration + */ + public ToServiceDefinition toServiceConfiguration(ToServiceConfigurationDefinition configuration) { + toServiceConfiguration = configuration; + return this; + } + + /** + * Refers to a Hystrix configuration to use for configuring the Hystrix EIP. + */ + public ToServiceDefinition toServiceConfiguration(String ref) { + toServiceConfigurationRef = ref; + return this; + } + + // Properties + // ------------------------------------------------------------------------- + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getNamespace() { + return namespace; + } + + public void setNamespace(String namespace) { + this.namespace = namespace; + } + + public ExchangePattern getPattern() { + return pattern; + } + + public void setPattern(ExchangePattern pattern) { + this.pattern = pattern; + } + + public String getDiscovery() { + return discovery; + } + + public void setDiscovery(String discovery) { + this.discovery = discovery; + } + + public ToServiceConfigurationDefinition getToServiceConfiguration() { + return toServiceConfiguration; + } + + public void setToServiceConfiguration(ToServiceConfigurationDefinition toServiceConfiguration) { + this.toServiceConfiguration = toServiceConfiguration; + } + + public String getToServiceConfigurationRef() { + return toServiceConfigurationRef; + } + + public void setToServiceConfigurationRef(String toServiceConfigurationRef) { + this.toServiceConfigurationRef = toServiceConfigurationRef; + } + + public String getUri() { + return uri; + } + + /** + * The uri of the endpoint to send to. + * The uri can be dynamic computed using the {@link org.apache.camel.language.simple.SimpleLanguage} expression. + */ + public void setUri(String uri) { + this.uri = uri; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/5bb0ba9d/camel-core/src/main/resources/org/apache/camel/model/jaxb.index ---------------------------------------------------------------------- diff --git a/camel-core/src/main/resources/org/apache/camel/model/jaxb.index b/camel-core/src/main/resources/org/apache/camel/model/jaxb.index index 9859736..549557f 100644 --- a/camel-core/src/main/resources/org/apache/camel/model/jaxb.index +++ b/camel-core/src/main/resources/org/apache/camel/model/jaxb.index @@ -89,6 +89,8 @@ ThrottleDefinition ThrowExceptionDefinition ToDefinition ToDynamicDefinition +ToServiceDefinition +ToServiceConfigurationDefinition TransactedDefinition TransformDefinition TryDefinition http://git-wip-us.apache.org/repos/asf/camel/blob/5bb0ba9d/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesEndpoint.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesEndpoint.java index 86dd081..0d444f0 100644 --- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesEndpoint.java +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesEndpoint.java @@ -21,7 +21,7 @@ import java.util.concurrent.ExecutorService; import io.fabric8.kubernetes.client.Config; import io.fabric8.kubernetes.client.ConfigBuilder; import io.fabric8.kubernetes.client.DefaultKubernetesClient; - +import io.fabric8.kubernetes.client.KubernetesClient; import org.apache.camel.Consumer; import org.apache.camel.Processor; import org.apache.camel.Producer; @@ -60,7 +60,7 @@ public class KubernetesEndpoint extends DefaultEndpoint { @UriParam private KubernetesConfiguration configuration; - private DefaultKubernetesClient client; + private transient KubernetesClient client; public KubernetesEndpoint(String uri, KubernetesComponent component, KubernetesConfiguration config) { super(uri, component); @@ -156,22 +156,22 @@ public class KubernetesEndpoint extends DefaultEndpoint { @Override protected void doStart() throws Exception { super.doStart(); - - client = configuration.getKubernetesClient() != null ? configuration.getKubernetesClient() - : createKubernetesClient(); + client = configuration.getKubernetesClient() != null ? configuration.getKubernetesClient() : createKubernetesClient(); } @Override protected void doStop() throws Exception { super.doStop(); - client.close(); + if (client != null) { + client.close(); + } } public ExecutorService createExecutor() { return getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, "KubernetesConsumer", configuration.getPoolSize()); } - public DefaultKubernetesClient getKubernetesClient() { + public KubernetesClient getKubernetesClient() { return client; } @@ -182,10 +182,9 @@ public class KubernetesEndpoint extends DefaultEndpoint { return configuration; } - private DefaultKubernetesClient createKubernetesClient() { + private KubernetesClient createKubernetesClient() { LOG.debug("Create Kubernetes client with the following Configuration: " + configuration.toString()); - DefaultKubernetesClient kubeClient = new DefaultKubernetesClient(); ConfigBuilder builder = new ConfigBuilder(); builder.withMasterUrl(configuration.getMasterUrl()); if ((ObjectHelper.isNotEmpty(configuration.getUsername()) @@ -228,8 +227,6 @@ public class KubernetesEndpoint extends DefaultEndpoint { } Config conf = builder.build(); - - kubeClient = new DefaultKubernetesClient(conf); - return kubeClient; + return new DefaultKubernetesClient(conf); } } http://git-wip-us.apache.org/repos/asf/camel/blob/5bb0ba9d/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesProcessorFactory.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesProcessorFactory.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesProcessorFactory.java new file mode 100644 index 0000000..f81c08d --- /dev/null +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesProcessorFactory.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kubernetes.processor; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.camel.ExchangePattern; +import org.apache.camel.Processor; +import org.apache.camel.component.kubernetes.KubernetesConfiguration; +import org.apache.camel.model.ProcessorDefinition; +import org.apache.camel.model.ToServiceConfigurationDefinition; +import org.apache.camel.model.ToServiceDefinition; +import org.apache.camel.spi.ProcessorFactory; +import org.apache.camel.spi.RouteContext; +import org.apache.camel.util.CamelContextHelper; +import org.apache.camel.util.IntrospectionSupport; + +public class KubernetesProcessorFactory implements ProcessorFactory { + + @Override + public Processor createChildProcessor(RouteContext routeContext, ProcessorDefinition<?> definition, boolean mandatory) throws Exception { + // not in use + return null; + } + + @Override + public Processor createProcessor(RouteContext routeContext, ProcessorDefinition<?> definition) throws Exception { + if (definition instanceof ToServiceDefinition) { + ToServiceDefinition ts = (ToServiceDefinition) definition; + + // discovery must either not be set, or if set then must be us + if (ts.getDiscovery() != null && !"kubernetes".equals(ts.getDiscovery())) { + return null; + } + + String name = ts.getName(); + String namespace = ts.getNamespace(); + String uri = ts.getUri(); + ExchangePattern mep = ts.getPattern(); + + ToServiceConfigurationDefinition config = ts.getToServiceConfiguration(); + ToServiceConfigurationDefinition configRef = null; + if (ts.getToServiceConfigurationRef() != null) { + configRef = CamelContextHelper.mandatoryLookup(routeContext.getCamelContext(), ts.getToServiceConfigurationRef(), ToServiceConfigurationDefinition.class); + } + + // extract the properties from the configuration from the model + Map<String, Object> parameters = new HashMap<>(); + if (configRef != null) { + IntrospectionSupport.getProperties(configRef, parameters, null); + } + if (config != null) { + IntrospectionSupport.getProperties(config, parameters, null); + } + // and set them on the kubernetes configuration class + KubernetesConfiguration kc = new KubernetesConfiguration(); + IntrospectionSupport.setProperties(kc, parameters); + + return new KubernetesServiceProcessor(name, namespace, uri, mep, kc); + } else { + return null; + } + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/5bb0ba9d/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceDiscovery.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceDiscovery.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceDiscovery.java new file mode 100644 index 0000000..420cdc3 --- /dev/null +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceDiscovery.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kubernetes.processor; + +import java.util.ArrayList; +import java.util.List; + +import io.fabric8.kubernetes.api.model.EndpointAddress; +import io.fabric8.kubernetes.api.model.EndpointPort; +import io.fabric8.kubernetes.api.model.EndpointSubset; +import io.fabric8.kubernetes.api.model.Endpoints; +import io.fabric8.kubernetes.client.KubernetesClient; +import org.apache.camel.support.ServiceSupport; +import org.apache.camel.util.IOHelper; +import org.apache.camel.util.ObjectHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KubernetesServiceDiscovery extends ServiceSupport { + + private static final Logger LOG = LoggerFactory.getLogger(KubernetesServiceDiscovery.class); + private static final int FIRST = 0; + + private String name; + private String namespace; + private String portName; + private KubernetesClient client; + + public KubernetesServiceDiscovery(String name, String namespace, String portName, KubernetesClient client) { + this.name = name; + this.namespace = namespace; + this.portName = portName; + this.client = client; + } + + public List<Server> getUpdatedListOfServers() { + Endpoints endpoints = client.endpoints().inNamespace(namespace).withName(name).get(); + List<Server> result = new ArrayList<Server>(); + if (endpoints != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Found [" + endpoints.getSubsets().size() + "] endpoints in namespace [" + + namespace + "] for name [" + name + "] and portName [" + portName + "]"); + } + for (EndpointSubset subset : endpoints.getSubsets()) { + if (subset.getPorts().size() == 1) { + EndpointPort port = subset.getPorts().get(FIRST); + for (EndpointAddress address : subset.getAddresses()) { + result.add(new Server(address.getIp(), port.getPort())); + } + } else { + for (EndpointPort port : subset.getPorts()) { + if (ObjectHelper.isEmpty(portName) || portName.endsWith(port.getName())) { + for (EndpointAddress address : subset.getAddresses()) { + result.add(new Server(address.getIp(), port.getPort())); + } + } + } + } + } + } + + return result; + } + + @Override + protected void doStart() throws Exception { + // noop + } + + @Override + protected void doStop() throws Exception { + if (client != null) { + IOHelper.close(client); + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/5bb0ba9d/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceProcessor.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceProcessor.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceProcessor.java new file mode 100644 index 0000000..3c9c231 --- /dev/null +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceProcessor.java @@ -0,0 +1,173 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kubernetes.processor; + +import java.util.List; +import java.util.Random; +import java.util.concurrent.RejectedExecutionException; + +import io.fabric8.kubernetes.client.Config; +import io.fabric8.kubernetes.client.ConfigBuilder; +import io.fabric8.kubernetes.client.DefaultKubernetesClient; +import io.fabric8.kubernetes.client.KubernetesClient; +import org.apache.camel.AsyncCallback; +import org.apache.camel.AsyncProcessor; +import org.apache.camel.Exchange; +import org.apache.camel.ExchangePattern; +import org.apache.camel.Traceable; +import org.apache.camel.component.kubernetes.KubernetesConfiguration; +import org.apache.camel.spi.IdAware; +import org.apache.camel.support.ServiceSupport; +import org.apache.camel.util.AsyncProcessorHelper; +import org.apache.camel.util.ObjectHelper; +import org.apache.camel.util.ServiceHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KubernetesServiceProcessor extends ServiceSupport implements AsyncProcessor, Traceable, IdAware { + + private static final Logger LOG = LoggerFactory.getLogger(KubernetesServiceProcessor.class); + + private String id; + private final String name; + private final String namespace; + private final String uri; + private final ExchangePattern exchangePattern; + private final KubernetesConfiguration configuration; + + private KubernetesServiceDiscovery discovery; + + // TODO: allow to plugin custom load balancer like ribbon + + public KubernetesServiceProcessor(String name, String namespace, String uri, ExchangePattern exchangePattern, KubernetesConfiguration configuration) { + this.name = name; + this.namespace = namespace; + this.uri = uri; + this.exchangePattern = exchangePattern; + this.configuration = configuration; + } + + @Override + public void process(Exchange exchange) throws Exception { + AsyncProcessorHelper.process(this, exchange); + } + + @Override + public boolean process(Exchange exchange, AsyncCallback callback) { + // TODO: in try .. catch and the callback stuff + + List<Server> services = discovery.getUpdatedListOfServers(); + // apply strategy to pick a service + if (services.isEmpty()) { + exchange.setException(new RejectedExecutionException("No active services with name " + name + " in namespace " + namespace)); + } + + // what strategy to use? random + int size = services.size(); + int ran = new Random().nextInt(size); + Server server = services.get(ran); + + String ip = server.getIp(); + int port = server.getPort(); + + LOG.debug("Random selected service {} active at: {}:{}", name, ip, port); + + // build uri based on the name + + + // TODO: lookup service + // TODO: apply LB strategy + // TODO build uri + callback.done(true); + return true; + } + + @Override + public String getId() { + return id; + } + + @Override + public void setId(String id) { + this.id = id; + } + + @Override + public String getTraceLabel() { + return "kubernetes"; + } + + @Override + protected void doStart() throws Exception { + discovery = new KubernetesServiceDiscovery(name, namespace, null, createKubernetesClient()); + ServiceHelper.startService(discovery); + } + + @Override + protected void doStop() throws Exception { + ServiceHelper.stopService(discovery); + } + + private KubernetesClient createKubernetesClient() { + LOG.debug("Create Kubernetes client with the following Configuration: " + configuration.toString()); + + ConfigBuilder builder = new ConfigBuilder(); + builder.withMasterUrl(configuration.getMasterUrl()); + if ((ObjectHelper.isNotEmpty(configuration.getUsername()) + && ObjectHelper.isNotEmpty(configuration.getPassword())) + && ObjectHelper.isEmpty(configuration.getOauthToken())) { + builder.withUsername(configuration.getUsername()); + builder.withPassword(configuration.getPassword()); + } else { + builder.withOauthToken(configuration.getOauthToken()); + } + if (ObjectHelper.isNotEmpty(configuration.getCaCertData())) { + builder.withCaCertData(configuration.getCaCertData()); + } + if (ObjectHelper.isNotEmpty(configuration.getCaCertFile())) { + builder.withCaCertFile(configuration.getCaCertFile()); + } + if (ObjectHelper.isNotEmpty(configuration.getClientCertData())) { + builder.withClientCertData(configuration.getClientCertData()); + } + if (ObjectHelper.isNotEmpty(configuration.getClientCertFile())) { + builder.withClientCertFile(configuration.getClientCertFile()); + } + if (ObjectHelper.isNotEmpty(configuration.getApiVersion())) { + builder.withApiVersion(configuration.getApiVersion()); + } + if (ObjectHelper.isNotEmpty(configuration.getClientKeyAlgo())) { + builder.withClientKeyAlgo(configuration.getClientKeyAlgo()); + } + if (ObjectHelper.isNotEmpty(configuration.getClientKeyData())) { + builder.withClientKeyData(configuration.getClientKeyData()); + } + if (ObjectHelper.isNotEmpty(configuration.getClientKeyFile())) { + builder.withClientKeyFile(configuration.getClientKeyFile()); + } + if (ObjectHelper.isNotEmpty(configuration.getClientKeyPassphrase())) { + builder.withClientKeyPassphrase(configuration.getClientKeyPassphrase()); + } + if (ObjectHelper.isNotEmpty(configuration.getTrustCerts())) { + builder.withTrustCerts(configuration.getTrustCerts()); + } + + Config conf = builder.build(); + return new DefaultKubernetesClient(conf); + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/5bb0ba9d/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/Server.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/Server.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/Server.java new file mode 100644 index 0000000..021fc86 --- /dev/null +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/Server.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kubernetes.processor; + +public class Server { + + private final String ip; + private final int port; + + public Server(String ip, int port) { + this.ip = ip; + this.port = port; + } + + public String getIp() { + return ip; + } + + public int getPort() { + return port; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/5bb0ba9d/components/camel-kubernetes/src/main/resources/META-INF/services/org/apache/camel/model/ToServiceDefinition ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/resources/META-INF/services/org/apache/camel/model/ToServiceDefinition b/components/camel-kubernetes/src/main/resources/META-INF/services/org/apache/camel/model/ToServiceDefinition new file mode 100644 index 0000000..acf5be8 --- /dev/null +++ b/components/camel-kubernetes/src/main/resources/META-INF/services/org/apache/camel/model/ToServiceDefinition @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +class=org.apache.camel.component.kubernetes.processor.KubernetesProcessorFactory