CAMEL-9683: A new toService EIP that uses a client discovery to lookup alive services and pick a service ip/port to use when calling the service from Camel route. Allows to plugin different providers.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/b50c98e8 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/b50c98e8 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/b50c98e8 Branch: refs/heads/remoteServiceCall Commit: b50c98e85b3286160ae8f73df0a3542647ddc6c6 Parents: 2d33838 Author: Claus Ibsen <davscl...@apache.org> Authored: Tue Apr 26 12:00:01 2016 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Mon May 23 09:23:57 2016 +0200 ---------------------------------------------------------------------- .../ServiceCallConfigurationDefinition.java | 36 +++++++++++++++ .../camel/model/ServiceCallDefinition.java | 46 ++++++++++++++++---- .../camel/spi/ServiceCallLoadBalancer.java | 34 +++++++++++++++ .../processor/KubernetesProcessorFactory.java | 17 +++++++- .../KubernetesServiceCallProcessor.java | 18 +++++++- .../processor/KubernetesServiceDiscovery.java | 2 +- .../processor/RandomLoadBalancer.java | 19 +++++--- .../component/kubernetes/processor/Server.java | 5 ++- .../processor/ServiceCallLoadBalancer.java | 25 ----------- 9 files changed, 159 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/b50c98e8/camel-core/src/main/java/org/apache/camel/model/ServiceCallConfigurationDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/ServiceCallConfigurationDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ServiceCallConfigurationDefinition.java index 2363a63..e84f554 100644 --- a/camel-core/src/main/java/org/apache/camel/model/ServiceCallConfigurationDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/ServiceCallConfigurationDefinition.java @@ -23,6 +23,7 @@ import javax.xml.bind.annotation.XmlRootElement; import javax.xml.bind.annotation.XmlTransient; import org.apache.camel.spi.Metadata; +import org.apache.camel.spi.ServiceCallLoadBalancer; @Metadata(label = "eip,routing") @XmlRootElement(name = "toServiceConfiguration") @@ -61,6 +62,10 @@ public class ServiceCallConfigurationDefinition extends IdentifiedType { private String clientKeyPassphrase; @XmlAttribute @Metadata(label = "security") private Boolean trustCerts; + @XmlAttribute + private String loadBalancerRef; + @XmlTransient + private ServiceCallLoadBalancer loadBalancer; public ServiceCallConfigurationDefinition() { } @@ -192,6 +197,21 @@ public class ServiceCallConfigurationDefinition extends IdentifiedType { this.trustCerts = trustCerts; } + public String getLoadBalancerRef() { + return loadBalancerRef; + } + + public void setLoadBalancerRef(String loadBalancerRef) { + this.loadBalancerRef = loadBalancerRef; + } + + public ServiceCallLoadBalancer getLoadBalancer() { + return loadBalancer; + } + + public void setLoadBalancer(ServiceCallLoadBalancer loadBalancer) { + this.loadBalancer = loadBalancer; + } // Fluent API // ------------------------------------------------------------------------- @@ -317,6 +337,22 @@ public class ServiceCallConfigurationDefinition extends IdentifiedType { } /** + * Sets a reference to a custom {@link org.apache.camel.spi.ServiceCallLoadBalancer} to use. + */ + public ServiceCallConfigurationDefinition loadBalancer(String loadBalancerRef) { + setLoadBalancerRef(loadBalancerRef); + return this; + } + + /** + * Sets a custom {@link org.apache.camel.spi.ServiceCallLoadBalancer} to use. + */ + public ServiceCallConfigurationDefinition loadBalancer(ServiceCallLoadBalancer loadBalancer) { + setLoadBalancer(loadBalancer); + return this; + } + + /** * End of configuration */ public ServiceCallDefinition end() { http://git-wip-us.apache.org/repos/asf/camel/blob/b50c98e8/camel-core/src/main/java/org/apache/camel/model/ServiceCallDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/ServiceCallDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ServiceCallDefinition.java index 7b2d1a5..d1c154d 100644 --- a/camel-core/src/main/java/org/apache/camel/model/ServiceCallDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/ServiceCallDefinition.java @@ -21,19 +21,19 @@ import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlAttribute; import javax.xml.bind.annotation.XmlElement; import javax.xml.bind.annotation.XmlRootElement; +import javax.xml.bind.annotation.XmlTransient; import org.apache.camel.ExchangePattern; import org.apache.camel.Processor; import org.apache.camel.spi.Metadata; import org.apache.camel.spi.RouteContext; +import org.apache.camel.spi.ServiceCallLoadBalancer; @Metadata(label = "eip,routing") @XmlRootElement(name = "serviceCall") @XmlAccessorType(XmlAccessType.FIELD) public class ServiceCallDefinition extends NoOutputDefinition<ServiceCallDefinition> { - // TODO: load balancing strategy - @XmlElement private ServiceCallConfigurationDefinition serviceCallConfiguration; @XmlAttribute @Metadata(required = "true") @@ -48,16 +48,14 @@ public class ServiceCallDefinition extends NoOutputDefinition<ServiceCallDefinit private String discovery; @XmlAttribute private String serviceCallConfigurationRef; + @XmlAttribute + private String loadBalancerRef; + @XmlTransient + private ServiceCallLoadBalancer loadBalancer; public ServiceCallDefinition() { } - // serviceCall("myService") (will use http by default) - // serviceCall("myService/foo") (will use http by default) - // serviceCall("http:myService/foo") - // serviceCall("myService", "http:myService.host:myService.port/foo") - // serviceCall("myService", "netty4:tcp:myService?connectTimeout=1000") - @Override public String toString() { return "ServiceCall[" + name + "]"; @@ -136,6 +134,22 @@ public class ServiceCallDefinition extends NoOutputDefinition<ServiceCallDefinit return this; } + /** + * Sets a reference to a custom {@link org.apache.camel.spi.ServiceCallLoadBalancer} to use. + */ + public ServiceCallDefinition loadBalancer(String loadBalancerRef) { + setLoadBalancerRef(loadBalancerRef); + return this; + } + + /** + * Sets a custom {@link org.apache.camel.spi.ServiceCallLoadBalancer} to use. + */ + public ServiceCallDefinition loadBalancer(ServiceCallLoadBalancer loadBalancer) { + setLoadBalancer(loadBalancer); + return this; + } + // Properties // ------------------------------------------------------------------------- @@ -198,4 +212,20 @@ public class ServiceCallDefinition extends NoOutputDefinition<ServiceCallDefinit public void setUri(String uri) { this.uri = uri; } + + public String getLoadBalancerRef() { + return loadBalancerRef; + } + + public void setLoadBalancerRef(String loadBalancerRef) { + this.loadBalancerRef = loadBalancerRef; + } + + public ServiceCallLoadBalancer getLoadBalancer() { + return loadBalancer; + } + + public void setLoadBalancer(ServiceCallLoadBalancer loadBalancer) { + this.loadBalancer = loadBalancer; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/b50c98e8/camel-core/src/main/java/org/apache/camel/spi/ServiceCallLoadBalancer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/spi/ServiceCallLoadBalancer.java b/camel-core/src/main/java/org/apache/camel/spi/ServiceCallLoadBalancer.java new file mode 100644 index 0000000..d9d7650 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/spi/ServiceCallLoadBalancer.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.spi; + +import java.util.Collection; + +/** + * Allows SPIs to implement custom load balacing strategies for the Service Call EIP. + */ +public interface ServiceCallLoadBalancer<T> { + + /** + * Chooses one of the servers to use using the implemented strategy. + * + * @param servers list of servers + * @return the choosen server to use. + */ + T chooseServer(Collection<T> servers); + +} http://git-wip-us.apache.org/repos/asf/camel/blob/b50c98e8/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesProcessorFactory.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesProcessorFactory.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesProcessorFactory.java index 13f77e2..f0b231b 100644 --- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesProcessorFactory.java +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesProcessorFactory.java @@ -27,6 +27,7 @@ import org.apache.camel.model.ServiceCallConfigurationDefinition; import org.apache.camel.model.ServiceCallDefinition; import org.apache.camel.spi.ProcessorFactory; import org.apache.camel.spi.RouteContext; +import org.apache.camel.spi.ServiceCallLoadBalancer; import org.apache.camel.util.CamelContextHelper; import org.apache.camel.util.IntrospectionSupport; @@ -79,7 +80,21 @@ public class KubernetesProcessorFactory implements ProcessorFactory { namespace = kc.getNamespace(); } - return new KubernetesServiceCallProcessor(name, namespace, uri, mep, kc); + // lookup the load balancer to use + ServiceCallLoadBalancer lb = ts.getLoadBalancer(); + if (lb == null && ts.getServiceCallConfigurationRef() != null) { + lb = CamelContextHelper.mandatoryLookup(routeContext.getCamelContext(), ts.getLoadBalancerRef(), ServiceCallLoadBalancer.class); + } + if (lb == null && config != null) { + lb = config.getLoadBalancer(); + } + if (lb == null && configRef != null) { + lb = CamelContextHelper.mandatoryLookup(routeContext.getCamelContext(), configRef.getLoadBalancerRef(), ServiceCallLoadBalancer.class); + } + + KubernetesServiceCallProcessor processor = new KubernetesServiceCallProcessor(name, namespace, uri, mep, kc); + processor.setLoadBalancer(lb); + return processor; } else { return null; } http://git-wip-us.apache.org/repos/asf/camel/blob/b50c98e8/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceCallProcessor.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceCallProcessor.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceCallProcessor.java index 8bdfe09..ff859d6 100644 --- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceCallProcessor.java +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceCallProcessor.java @@ -34,6 +34,7 @@ import org.apache.camel.component.kubernetes.KubernetesConfiguration; import org.apache.camel.component.kubernetes.KubernetesConstants; import org.apache.camel.processor.SendDynamicProcessor; import org.apache.camel.spi.IdAware; +import org.apache.camel.spi.ServiceCallLoadBalancer; import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.AsyncProcessorHelper; import org.apache.camel.util.ObjectHelper; @@ -59,7 +60,7 @@ public class KubernetesServiceCallProcessor extends ServiceSupport implements As private final KubernetesConfiguration configuration; private KubernetesServiceDiscovery discovery; - private ServiceCallLoadBalancer loadBalancer = new RandomLoadBalancer(); + private ServiceCallLoadBalancer<Server> loadBalancer; private final ServiceCallExpression serviceCallExpression; private SendDynamicProcessor processor; @@ -116,7 +117,7 @@ public class KubernetesServiceCallProcessor extends ServiceSupport implements As } // let the client load balancer chose which server to use - Server server = loadBalancer.choseServer(servers); + Server server = loadBalancer.chooseServer(servers); String ip = server.getIp(); int port = server.getPort(); LOG.debug("Random selected service {} active at server: {}:{}", name, ip, port); @@ -154,12 +155,25 @@ public class KubernetesServiceCallProcessor extends ServiceSupport implements As return "kubernetes"; } + public ServiceCallLoadBalancer<Server> getLoadBalancer() { + return loadBalancer; + } + + public void setLoadBalancer(ServiceCallLoadBalancer<Server> loadBalancer) { + this.loadBalancer = loadBalancer; + } + @Override protected void doStart() throws Exception { ObjectHelper.notEmpty(name, "name", this); ObjectHelper.notEmpty(namespace, "namespace", this); ObjectHelper.notEmpty(configuration.getMasterUrl(), "masterUrl", this); + if (loadBalancer == null) { + loadBalancer = new RandomLoadBalancer(); + } + LOG.info("KubernetesServiceCall at namespace: {} with service name: {} is using load balancer: {}", namespace, name, loadBalancer); + discovery = new KubernetesServiceDiscovery(name, namespace, null, createKubernetesClient()); processor = new SendDynamicProcessor(uri, serviceCallExpression); processor.setCamelContext(getCamelContext()); http://git-wip-us.apache.org/repos/asf/camel/blob/b50c98e8/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceDiscovery.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceDiscovery.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceDiscovery.java index ff44e44..75590f7 100644 --- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceDiscovery.java +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceDiscovery.java @@ -31,7 +31,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Discovers services in Kubernetes. + * Discovers where services in Kubernetes are running on which servers. */ public class KubernetesServiceDiscovery extends ServiceSupport { http://git-wip-us.apache.org/repos/asf/camel/blob/b50c98e8/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/RandomLoadBalancer.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/RandomLoadBalancer.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/RandomLoadBalancer.java index 86a9b6c..5724098 100644 --- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/RandomLoadBalancer.java +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/RandomLoadBalancer.java @@ -16,16 +16,25 @@ */ package org.apache.camel.component.kubernetes.processor; +import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Random; -public class RandomLoadBalancer implements ServiceCallLoadBalancer { +import org.apache.camel.spi.ServiceCallLoadBalancer; + +public class RandomLoadBalancer implements ServiceCallLoadBalancer<Server> { @Override - public Server choseServer(List<Server> services) { - int size = services.size(); + public Server chooseServer(Collection<Server> servers) { + List<Server> list = new ArrayList<>(servers); + int size = list.size(); int ran = new Random().nextInt(size); - Server server = services.get(ran); - return server; + return list.get(ran); + } + + @Override + public String toString() { + return "RandomLoadBalancer"; } } http://git-wip-us.apache.org/repos/asf/camel/blob/b50c98e8/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/Server.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/Server.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/Server.java index d0a4471..6a6a913 100644 --- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/Server.java +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/Server.java @@ -16,7 +16,10 @@ */ package org.apache.camel.component.kubernetes.processor; -public class Server { +/** + * Represents a model of a kubernetes server. + */ +public final class Server { private final String ip; private final int port; http://git-wip-us.apache.org/repos/asf/camel/blob/b50c98e8/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/ServiceCallLoadBalancer.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/ServiceCallLoadBalancer.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/ServiceCallLoadBalancer.java deleted file mode 100644 index f2dc7ad..0000000 --- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/ServiceCallLoadBalancer.java +++ /dev/null @@ -1,25 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.kubernetes.processor; - -import java.util.List; - -public interface ServiceCallLoadBalancer { - - Server choseServer(List<Server> services); - -}