CAMEL-9683: A new toService EIP that uses a client discovery to lookup alive services and pick a service ip/port to use when calling the service from Camel route. Allows to plugin different providers.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8577b501 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8577b501 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8577b501 Branch: refs/heads/remoteServiceCall Commit: 8577b501a7265276b9df917b196536840a65da28 Parents: 02ec0d4 Author: Claus Ibsen <davscl...@apache.org> Authored: Tue Apr 26 14:22:28 2016 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Mon May 23 09:25:01 2016 +0200 ---------------------------------------------------------------------- .../ServiceCallConfigurationDefinition.java | 37 +++++++ .../camel/model/ServiceCallDefinition.java | 37 +++++++ .../camel/spi/ServiceCallLoadBalancer.java | 6 +- .../org/apache/camel/spi/ServiceCallServer.java | 36 +++++++ .../spi/ServiceCallServerListStrategy.java | 45 ++++++++ .../processor/KubernetesProcessorFactory.java | 33 ++++++ .../kubernetes/processor/KubernetesServer.java | 41 +++++++ .../KubernetesServiceCallProcessor.java | 36 ++++--- ...KubernetesServiceCallServerListStrategy.java | 106 +++++++++++++++++++ .../processor/KubernetesServiceDiscovery.java | 93 ---------------- .../processor/RandomLoadBalancer.java | 6 +- .../processor/RoundRobinBalancer.java | 6 +- .../component/kubernetes/processor/Server.java | 39 ------- 13 files changed, 368 insertions(+), 153 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/8577b501/camel-core/src/main/java/org/apache/camel/model/ServiceCallConfigurationDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/ServiceCallConfigurationDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ServiceCallConfigurationDefinition.java index e84f554..cda3338 100644 --- a/camel-core/src/main/java/org/apache/camel/model/ServiceCallConfigurationDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/ServiceCallConfigurationDefinition.java @@ -24,6 +24,7 @@ import javax.xml.bind.annotation.XmlTransient; import org.apache.camel.spi.Metadata; import org.apache.camel.spi.ServiceCallLoadBalancer; +import org.apache.camel.spi.ServiceCallServerListStrategy; @Metadata(label = "eip,routing") @XmlRootElement(name = "toServiceConfiguration") @@ -66,6 +67,10 @@ public class ServiceCallConfigurationDefinition extends IdentifiedType { private String loadBalancerRef; @XmlTransient private ServiceCallLoadBalancer loadBalancer; + @XmlAttribute + private String serverListStrategyRef; + @XmlTransient + private ServiceCallServerListStrategy serverListStrategy; public ServiceCallConfigurationDefinition() { } @@ -213,6 +218,22 @@ public class ServiceCallConfigurationDefinition extends IdentifiedType { this.loadBalancer = loadBalancer; } + public String getServerListStrategyRef() { + return serverListStrategyRef; + } + + public void setServerListStrategyRef(String serverListStrategyRef) { + this.serverListStrategyRef = serverListStrategyRef; + } + + public ServiceCallServerListStrategy getServerListStrategy() { + return serverListStrategy; + } + + public void setServerListStrategy(ServiceCallServerListStrategy serverListStrategy) { + this.serverListStrategy = serverListStrategy; + } + // Fluent API // ------------------------------------------------------------------------- @@ -353,6 +374,22 @@ public class ServiceCallConfigurationDefinition extends IdentifiedType { } /** + * Sets a reference to a custom {@link org.apache.camel.spi.ServiceCallServerListStrategy} to use. + */ + public ServiceCallConfigurationDefinition serverListStrategy(String serverListStrategyRef) { + setServerListStrategyRef(serverListStrategyRef); + return this; + } + + /** + * Sets a custom {@link org.apache.camel.spi.ServiceCallServerListStrategy} to use. + */ + public ServiceCallConfigurationDefinition serverListStrategy(ServiceCallServerListStrategy serverListStrategy) { + setServerListStrategy(serverListStrategy); + return this; + } + + /** * End of configuration */ public ServiceCallDefinition end() { http://git-wip-us.apache.org/repos/asf/camel/blob/8577b501/camel-core/src/main/java/org/apache/camel/model/ServiceCallDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/ServiceCallDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ServiceCallDefinition.java index d1c154d..1bfb0b5 100644 --- a/camel-core/src/main/java/org/apache/camel/model/ServiceCallDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/ServiceCallDefinition.java @@ -28,6 +28,7 @@ import org.apache.camel.Processor; import org.apache.camel.spi.Metadata; import org.apache.camel.spi.RouteContext; import org.apache.camel.spi.ServiceCallLoadBalancer; +import org.apache.camel.spi.ServiceCallServerListStrategy; @Metadata(label = "eip,routing") @XmlRootElement(name = "serviceCall") @@ -52,6 +53,10 @@ public class ServiceCallDefinition extends NoOutputDefinition<ServiceCallDefinit private String loadBalancerRef; @XmlTransient private ServiceCallLoadBalancer loadBalancer; + @XmlAttribute + private String serverListStrategyRef; + @XmlTransient + private ServiceCallServerListStrategy serverListStrategy; public ServiceCallDefinition() { } @@ -150,6 +155,22 @@ public class ServiceCallDefinition extends NoOutputDefinition<ServiceCallDefinit return this; } + /** + * Sets a reference to a custom {@link org.apache.camel.spi.ServiceCallServerListStrategy} to use. + */ + public ServiceCallDefinition serverListStrategy(String serverListStrategyRef) { + setServerListStrategyRef(serverListStrategyRef); + return this; + } + + /** + * Sets a custom {@link org.apache.camel.spi.ServiceCallServerListStrategy} to use. + */ + public ServiceCallDefinition serverListStrategy(ServiceCallServerListStrategy serverListStrategy) { + setServerListStrategy(serverListStrategy); + return this; + } + // Properties // ------------------------------------------------------------------------- @@ -228,4 +249,20 @@ public class ServiceCallDefinition extends NoOutputDefinition<ServiceCallDefinit public void setLoadBalancer(ServiceCallLoadBalancer loadBalancer) { this.loadBalancer = loadBalancer; } + + public String getServerListStrategyRef() { + return serverListStrategyRef; + } + + public void setServerListStrategyRef(String serverListStrategyRef) { + this.serverListStrategyRef = serverListStrategyRef; + } + + public ServiceCallServerListStrategy getServerListStrategy() { + return serverListStrategy; + } + + public void setServerListStrategy(ServiceCallServerListStrategy serverListStrategy) { + this.serverListStrategy = serverListStrategy; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/8577b501/camel-core/src/main/java/org/apache/camel/spi/ServiceCallLoadBalancer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/spi/ServiceCallLoadBalancer.java b/camel-core/src/main/java/org/apache/camel/spi/ServiceCallLoadBalancer.java index c7b05db..908fcbb 100644 --- a/camel-core/src/main/java/org/apache/camel/spi/ServiceCallLoadBalancer.java +++ b/camel-core/src/main/java/org/apache/camel/spi/ServiceCallLoadBalancer.java @@ -19,9 +19,11 @@ package org.apache.camel.spi; import java.util.Collection; /** - * Allows SPIs to implement custom load balacing strategies for the Service Call EIP. + * Allows SPIs to implement custom load balancing strategies for the Service Call EIP. + * + * @see ServiceCallServerListStrategy */ -public interface ServiceCallLoadBalancer<T> { +public interface ServiceCallLoadBalancer<T extends ServiceCallServer> { /** * Chooses one of the servers to use using the implemented strategy. http://git-wip-us.apache.org/repos/asf/camel/blob/8577b501/camel-core/src/main/java/org/apache/camel/spi/ServiceCallServer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/spi/ServiceCallServer.java b/camel-core/src/main/java/org/apache/camel/spi/ServiceCallServer.java new file mode 100644 index 0000000..1579599 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/spi/ServiceCallServer.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.spi; + +/** + * Represents a server that host a service for the Service Call EIP. + * + * @see ServiceCallLoadBalancer + * @see ServiceCallServerListStrategy + */ +public interface ServiceCallServer { + + /** + * Gets the IP or hostname of the server hosting the service + */ + String getIp(); + + /** + * Gets the port number of the server hosting the service + */ + int getPort(); +} http://git-wip-us.apache.org/repos/asf/camel/blob/8577b501/camel-core/src/main/java/org/apache/camel/spi/ServiceCallServerListStrategy.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/spi/ServiceCallServerListStrategy.java b/camel-core/src/main/java/org/apache/camel/spi/ServiceCallServerListStrategy.java new file mode 100644 index 0000000..681a662 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/spi/ServiceCallServerListStrategy.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.spi; + +import java.util.Collection; + +/** + * Allows SPIs to implement custom server list strategies for the Service Call EIP. + * + * @see ServiceCallLoadBalancer + * @see ServiceCallServer + */ +public interface ServiceCallServerListStrategy<T extends ServiceCallServer> { + + /** + * Gets the initial list of servers. + * <p/> + * This method may return <tt>null</tt> or an empty list. + */ + Collection<T> getInitialListOfServers(); + + /** + * Gets the updated list of servers. + * <p/> + * This method can either be called on-demand prior to a service call, or have + * a background job that is scheduled to update the list, or a watcher + * that triggers when the list of servers changes. + */ + Collection<T> getUpdatedListOfServers(); + +} http://git-wip-us.apache.org/repos/asf/camel/blob/8577b501/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesProcessorFactory.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesProcessorFactory.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesProcessorFactory.java index 05d979b..7ecc6b2 100644 --- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesProcessorFactory.java +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesProcessorFactory.java @@ -28,6 +28,7 @@ import org.apache.camel.model.ServiceCallDefinition; import org.apache.camel.spi.ProcessorFactory; import org.apache.camel.spi.RouteContext; import org.apache.camel.spi.ServiceCallLoadBalancer; +import org.apache.camel.spi.ServiceCallServerListStrategy; import org.apache.camel.util.CamelContextHelper; import org.apache.camel.util.IntrospectionSupport; @@ -90,8 +91,18 @@ public class KubernetesProcessorFactory implements ProcessorFactory { lb = configureLoadBalancer(routeContext, configRef); } + // lookup the server list strategy to use (configured on EIP takes precedence vs configured on configuration) + ServiceCallServerListStrategy sl = configureServerListStrategy(routeContext, sc); + if (sl == null && config != null) { + sl = configureServerListStrategy(routeContext, config); + } + if (sl == null && configRef != null) { + sl = configureServerListStrategy(routeContext, configRef); + } + KubernetesServiceCallProcessor processor = new KubernetesServiceCallProcessor(name, namespace, uri, mep, kc); processor.setLoadBalancer(lb); + processor.setServerListStrategy(sl); return processor; } else { return null; @@ -135,4 +146,26 @@ public class KubernetesProcessorFactory implements ProcessorFactory { return lb; } + private ServiceCallServerListStrategy configureServerListStrategy(RouteContext routeContext, ServiceCallDefinition sd) { + ServiceCallServerListStrategy lb = null; + + if (sd != null) { + lb = sd.getServerListStrategy(); + if (lb == null && sd.getServerListStrategyRef() != null) { + lb = CamelContextHelper.mandatoryLookup(routeContext.getCamelContext(), sd.getServerListStrategyRef(), ServiceCallServerListStrategy.class); + } + } + + return lb; + } + + private ServiceCallServerListStrategy configureServerListStrategy(RouteContext routeContext, ServiceCallConfigurationDefinition config) { + ServiceCallServerListStrategy lb = config.getServerListStrategy(); + if (lb == null && config.getServerListStrategyRef() != null) { + String ref = config.getServerListStrategyRef(); + lb = CamelContextHelper.mandatoryLookup(routeContext.getCamelContext(), ref, ServiceCallServerListStrategy.class); + } + return lb; + } + } http://git-wip-us.apache.org/repos/asf/camel/blob/8577b501/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServer.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServer.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServer.java new file mode 100644 index 0000000..007e5c8 --- /dev/null +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServer.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kubernetes.processor; + +import org.apache.camel.spi.ServiceCallServer; + +/** + * Represents a model of a kubernetes server. + */ +public final class KubernetesServer implements ServiceCallServer { + + private final String ip; + private final int port; + + public KubernetesServer(String ip, int port) { + this.ip = ip; + this.port = port; + } + + public String getIp() { + return ip; + } + + public int getPort() { + return port; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/8577b501/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceCallProcessor.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceCallProcessor.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceCallProcessor.java index ff859d6..8900017 100644 --- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceCallProcessor.java +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceCallProcessor.java @@ -16,7 +16,7 @@ */ package org.apache.camel.component.kubernetes.processor; -import java.util.List; +import java.util.Collection; import java.util.concurrent.RejectedExecutionException; import io.fabric8.kubernetes.client.Config; @@ -35,6 +35,7 @@ import org.apache.camel.component.kubernetes.KubernetesConstants; import org.apache.camel.processor.SendDynamicProcessor; import org.apache.camel.spi.IdAware; import org.apache.camel.spi.ServiceCallLoadBalancer; +import org.apache.camel.spi.ServiceCallServerListStrategy; import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.AsyncProcessorHelper; import org.apache.camel.util.ObjectHelper; @@ -58,9 +59,8 @@ public class KubernetesServiceCallProcessor extends ServiceSupport implements As private final String uri; private final ExchangePattern exchangePattern; private final KubernetesConfiguration configuration; - private KubernetesServiceDiscovery discovery; - - private ServiceCallLoadBalancer<Server> loadBalancer; + private ServiceCallServerListStrategy<KubernetesServer> serverListStrategy; + private ServiceCallLoadBalancer<KubernetesServer> loadBalancer; private final ServiceCallExpression serviceCallExpression; private SendDynamicProcessor processor; @@ -101,9 +101,9 @@ public class KubernetesServiceCallProcessor extends ServiceSupport implements As @Override public boolean process(Exchange exchange, AsyncCallback callback) { - List<Server> servers = null; + Collection<KubernetesServer> servers = null; try { - servers = discovery.getUpdatedListOfServers(); + servers = serverListStrategy.getUpdatedListOfServers(); if (servers == null || servers.isEmpty()) { exchange.setException(new RejectedExecutionException("No active services with name " + name + " in namespace " + namespace)); } @@ -117,7 +117,7 @@ public class KubernetesServiceCallProcessor extends ServiceSupport implements As } // let the client load balancer chose which server to use - Server server = loadBalancer.chooseServer(servers); + KubernetesServer server = loadBalancer.chooseServer(servers); String ip = server.getIp(); int port = server.getPort(); LOG.debug("Random selected service {} active at server: {}:{}", name, ip, port); @@ -155,14 +155,22 @@ public class KubernetesServiceCallProcessor extends ServiceSupport implements As return "kubernetes"; } - public ServiceCallLoadBalancer<Server> getLoadBalancer() { + public ServiceCallLoadBalancer<KubernetesServer> getLoadBalancer() { return loadBalancer; } - public void setLoadBalancer(ServiceCallLoadBalancer<Server> loadBalancer) { + public void setLoadBalancer(ServiceCallLoadBalancer<KubernetesServer> loadBalancer) { this.loadBalancer = loadBalancer; } + public ServiceCallServerListStrategy getServerListStrategy() { + return serverListStrategy; + } + + public void setServerListStrategy(ServiceCallServerListStrategy serverListStrategy) { + this.serverListStrategy = serverListStrategy; + } + @Override protected void doStart() throws Exception { ObjectHelper.notEmpty(name, "name", this); @@ -172,20 +180,22 @@ public class KubernetesServiceCallProcessor extends ServiceSupport implements As if (loadBalancer == null) { loadBalancer = new RandomLoadBalancer(); } - LOG.info("KubernetesServiceCall at namespace: {} with service name: {} is using load balancer: {}", namespace, name, loadBalancer); + if (serverListStrategy == null) { + serverListStrategy = new KubernetesServiceCallServerListStrategy(name, namespace, null, createKubernetesClient()); + } + LOG.info("KubernetesServiceCall at namespace: {} with service name: {} is using load balancer: {} and service discovery: {}", namespace, name, loadBalancer, serverListStrategy); - discovery = new KubernetesServiceDiscovery(name, namespace, null, createKubernetesClient()); processor = new SendDynamicProcessor(uri, serviceCallExpression); processor.setCamelContext(getCamelContext()); if (exchangePattern != null) { processor.setPattern(exchangePattern); } - ServiceHelper.startServices(discovery, processor); + ServiceHelper.startServices(serverListStrategy, processor); } @Override protected void doStop() throws Exception { - ServiceHelper.stopServices(processor, discovery); + ServiceHelper.stopServices(processor, serverListStrategy); } private OpenShiftClient createKubernetesClient() { http://git-wip-us.apache.org/repos/asf/camel/blob/8577b501/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceCallServerListStrategy.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceCallServerListStrategy.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceCallServerListStrategy.java new file mode 100644 index 0000000..94fdc43 --- /dev/null +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceCallServerListStrategy.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kubernetes.processor; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import io.fabric8.kubernetes.api.model.EndpointAddress; +import io.fabric8.kubernetes.api.model.EndpointPort; +import io.fabric8.kubernetes.api.model.EndpointSubset; +import io.fabric8.kubernetes.api.model.Endpoints; +import io.fabric8.openshift.client.OpenShiftClient; +import org.apache.camel.spi.ServiceCallServerListStrategy; +import org.apache.camel.support.ServiceSupport; +import org.apache.camel.util.IOHelper; +import org.apache.camel.util.ObjectHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Discovers where services are running on which servers in Kubernetes. + */ +public class KubernetesServiceCallServerListStrategy extends ServiceSupport implements ServiceCallServerListStrategy<KubernetesServer> { + + private static final Logger LOG = LoggerFactory.getLogger(KubernetesServiceCallServerListStrategy.class); + private static final int FIRST = 0; + + private String name; + private String namespace; + private String portName; + private OpenShiftClient client; + + public KubernetesServiceCallServerListStrategy(String name, String namespace, String portName, OpenShiftClient client) { + this.name = name; + this.namespace = namespace; + this.portName = portName; + this.client = client; + } + + @Override + @SuppressWarnings("unchecked") + public Collection<KubernetesServer> getInitialListOfServers() { + return Collections.EMPTY_LIST; + } + + public Collection<KubernetesServer> getUpdatedListOfServers() { + LOG.debug("Discovering endpoints from namespace: {} with name: {}", namespace, name); + Endpoints endpoints = client.endpoints().inNamespace(namespace).withName(name).get(); + List<KubernetesServer> result = new ArrayList<KubernetesServer>(); + if (endpoints != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Found {} endpoints in namespace: {} for name: {} and portName: {}", endpoints.getSubsets().size(), namespace, name, portName); + } + for (EndpointSubset subset : endpoints.getSubsets()) { + if (subset.getPorts().size() == 1) { + EndpointPort port = subset.getPorts().get(FIRST); + for (EndpointAddress address : subset.getAddresses()) { + result.add(new KubernetesServer(address.getIp(), port.getPort())); + } + } else { + for (EndpointPort port : subset.getPorts()) { + if (ObjectHelper.isEmpty(portName) || portName.endsWith(port.getName())) { + for (EndpointAddress address : subset.getAddresses()) { + result.add(new KubernetesServer(address.getIp(), port.getPort())); + } + } + } + } + } + } + + return result; + } + + @Override + protected void doStart() throws Exception { + // noop + } + + @Override + protected void doStop() throws Exception { + if (client != null) { + IOHelper.close(client); + } + } + + public String toString() { + return "KubernetesServiceDiscovery"; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/8577b501/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceDiscovery.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceDiscovery.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceDiscovery.java deleted file mode 100644 index 75590f7..0000000 --- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceDiscovery.java +++ /dev/null @@ -1,93 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.kubernetes.processor; - -import java.util.ArrayList; -import java.util.List; - -import io.fabric8.kubernetes.api.model.EndpointAddress; -import io.fabric8.kubernetes.api.model.EndpointPort; -import io.fabric8.kubernetes.api.model.EndpointSubset; -import io.fabric8.kubernetes.api.model.Endpoints; -import io.fabric8.openshift.client.OpenShiftClient; -import org.apache.camel.support.ServiceSupport; -import org.apache.camel.util.IOHelper; -import org.apache.camel.util.ObjectHelper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Discovers where services in Kubernetes are running on which servers. - */ -public class KubernetesServiceDiscovery extends ServiceSupport { - - private static final Logger LOG = LoggerFactory.getLogger(KubernetesServiceDiscovery.class); - private static final int FIRST = 0; - - private String name; - private String namespace; - private String portName; - private OpenShiftClient client; - - public KubernetesServiceDiscovery(String name, String namespace, String portName, OpenShiftClient client) { - this.name = name; - this.namespace = namespace; - this.portName = portName; - this.client = client; - } - - public List<Server> getUpdatedListOfServers() { - LOG.debug("Discovering endpoints from namespace: {} with name: {}", namespace, name); - Endpoints endpoints = client.endpoints().inNamespace(namespace).withName(name).get(); - List<Server> result = new ArrayList<Server>(); - if (endpoints != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Found {} endpoints in namespace: {} for name: {} and portName: {}", endpoints.getSubsets().size(), namespace, name, portName); - } - for (EndpointSubset subset : endpoints.getSubsets()) { - if (subset.getPorts().size() == 1) { - EndpointPort port = subset.getPorts().get(FIRST); - for (EndpointAddress address : subset.getAddresses()) { - result.add(new Server(address.getIp(), port.getPort())); - } - } else { - for (EndpointPort port : subset.getPorts()) { - if (ObjectHelper.isEmpty(portName) || portName.endsWith(port.getName())) { - for (EndpointAddress address : subset.getAddresses()) { - result.add(new Server(address.getIp(), port.getPort())); - } - } - } - } - } - } - - return result; - } - - @Override - protected void doStart() throws Exception { - // noop - } - - @Override - protected void doStop() throws Exception { - if (client != null) { - IOHelper.close(client); - } - } -} http://git-wip-us.apache.org/repos/asf/camel/blob/8577b501/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/RandomLoadBalancer.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/RandomLoadBalancer.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/RandomLoadBalancer.java index 5724098..1b55e75 100644 --- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/RandomLoadBalancer.java +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/RandomLoadBalancer.java @@ -23,11 +23,11 @@ import java.util.Random; import org.apache.camel.spi.ServiceCallLoadBalancer; -public class RandomLoadBalancer implements ServiceCallLoadBalancer<Server> { +public class RandomLoadBalancer implements ServiceCallLoadBalancer<KubernetesServer> { @Override - public Server chooseServer(Collection<Server> servers) { - List<Server> list = new ArrayList<>(servers); + public KubernetesServer chooseServer(Collection<KubernetesServer> servers) { + List<KubernetesServer> list = new ArrayList<>(servers); int size = list.size(); int ran = new Random().nextInt(size); return list.get(ran); http://git-wip-us.apache.org/repos/asf/camel/blob/8577b501/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/RoundRobinBalancer.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/RoundRobinBalancer.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/RoundRobinBalancer.java index 2a2a401..1cfa86d 100644 --- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/RoundRobinBalancer.java +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/RoundRobinBalancer.java @@ -22,13 +22,13 @@ import java.util.List; import org.apache.camel.spi.ServiceCallLoadBalancer; -public class RoundRobinBalancer implements ServiceCallLoadBalancer<Server> { +public class RoundRobinBalancer implements ServiceCallLoadBalancer<KubernetesServer> { private int counter = -1; @Override - public Server chooseServer(Collection<Server> servers) { - List<Server> list = new ArrayList<>(servers); + public KubernetesServer chooseServer(Collection<KubernetesServer> servers) { + List<KubernetesServer> list = new ArrayList<>(servers); int size = list.size(); if (++counter >= size) { http://git-wip-us.apache.org/repos/asf/camel/blob/8577b501/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/Server.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/Server.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/Server.java deleted file mode 100644 index 6a6a913..0000000 --- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/Server.java +++ /dev/null @@ -1,39 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.kubernetes.processor; - -/** - * Represents a model of a kubernetes server. - */ -public final class Server { - - private final String ip; - private final int port; - - public Server(String ip, int port) { - this.ip = ip; - this.port = port; - } - - public String getIp() { - return ip; - } - - public int getPort() { - return port; - } -}