CAMEL-9683: A new toService EIP that uses a client discovery to lookup alive 
services and pick a service ip/port to use when calling the service from Camel 
route. Allows to plugin different providers.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8577b501
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8577b501
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8577b501

Branch: refs/heads/remoteServiceCall
Commit: 8577b501a7265276b9df917b196536840a65da28
Parents: 02ec0d4
Author: Claus Ibsen <davscl...@apache.org>
Authored: Tue Apr 26 14:22:28 2016 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Mon May 23 09:25:01 2016 +0200

----------------------------------------------------------------------
 .../ServiceCallConfigurationDefinition.java     |  37 +++++++
 .../camel/model/ServiceCallDefinition.java      |  37 +++++++
 .../camel/spi/ServiceCallLoadBalancer.java      |   6 +-
 .../org/apache/camel/spi/ServiceCallServer.java |  36 +++++++
 .../spi/ServiceCallServerListStrategy.java      |  45 ++++++++
 .../processor/KubernetesProcessorFactory.java   |  33 ++++++
 .../kubernetes/processor/KubernetesServer.java  |  41 +++++++
 .../KubernetesServiceCallProcessor.java         |  36 ++++---
 ...KubernetesServiceCallServerListStrategy.java | 106 +++++++++++++++++++
 .../processor/KubernetesServiceDiscovery.java   |  93 ----------------
 .../processor/RandomLoadBalancer.java           |   6 +-
 .../processor/RoundRobinBalancer.java           |   6 +-
 .../component/kubernetes/processor/Server.java  |  39 -------
 13 files changed, 368 insertions(+), 153 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/8577b501/camel-core/src/main/java/org/apache/camel/model/ServiceCallConfigurationDefinition.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/model/ServiceCallConfigurationDefinition.java
 
b/camel-core/src/main/java/org/apache/camel/model/ServiceCallConfigurationDefinition.java
index e84f554..cda3338 100644
--- 
a/camel-core/src/main/java/org/apache/camel/model/ServiceCallConfigurationDefinition.java
+++ 
b/camel-core/src/main/java/org/apache/camel/model/ServiceCallConfigurationDefinition.java
@@ -24,6 +24,7 @@ import javax.xml.bind.annotation.XmlTransient;
 
 import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.ServiceCallLoadBalancer;
+import org.apache.camel.spi.ServiceCallServerListStrategy;
 
 @Metadata(label = "eip,routing")
 @XmlRootElement(name = "toServiceConfiguration")
@@ -66,6 +67,10 @@ public class ServiceCallConfigurationDefinition extends 
IdentifiedType {
     private String loadBalancerRef;
     @XmlTransient
     private ServiceCallLoadBalancer loadBalancer;
+    @XmlAttribute
+    private String serverListStrategyRef;
+    @XmlTransient
+    private ServiceCallServerListStrategy serverListStrategy;
 
     public ServiceCallConfigurationDefinition() {
     }
@@ -213,6 +218,22 @@ public class ServiceCallConfigurationDefinition extends 
IdentifiedType {
         this.loadBalancer = loadBalancer;
     }
 
+    public String getServerListStrategyRef() {
+        return serverListStrategyRef;
+    }
+
+    public void setServerListStrategyRef(String serverListStrategyRef) {
+        this.serverListStrategyRef = serverListStrategyRef;
+    }
+
+    public ServiceCallServerListStrategy getServerListStrategy() {
+        return serverListStrategy;
+    }
+
+    public void setServerListStrategy(ServiceCallServerListStrategy 
serverListStrategy) {
+        this.serverListStrategy = serverListStrategy;
+    }
+
     // Fluent API
     // 
-------------------------------------------------------------------------
 
@@ -353,6 +374,22 @@ public class ServiceCallConfigurationDefinition extends 
IdentifiedType {
     }
 
     /**
+     * Sets a reference to a custom {@link 
org.apache.camel.spi.ServiceCallServerListStrategy} to use.
+     */
+    public ServiceCallConfigurationDefinition serverListStrategy(String 
serverListStrategyRef) {
+        setServerListStrategyRef(serverListStrategyRef);
+        return this;
+    }
+
+    /**
+     * Sets a custom {@link 
org.apache.camel.spi.ServiceCallServerListStrategy} to use.
+     */
+    public ServiceCallConfigurationDefinition 
serverListStrategy(ServiceCallServerListStrategy serverListStrategy) {
+        setServerListStrategy(serverListStrategy);
+        return this;
+    }
+
+    /**
      * End of configuration
      */
     public ServiceCallDefinition end() {

http://git-wip-us.apache.org/repos/asf/camel/blob/8577b501/camel-core/src/main/java/org/apache/camel/model/ServiceCallDefinition.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/model/ServiceCallDefinition.java 
b/camel-core/src/main/java/org/apache/camel/model/ServiceCallDefinition.java
index d1c154d..1bfb0b5 100644
--- a/camel-core/src/main/java/org/apache/camel/model/ServiceCallDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/ServiceCallDefinition.java
@@ -28,6 +28,7 @@ import org.apache.camel.Processor;
 import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.RouteContext;
 import org.apache.camel.spi.ServiceCallLoadBalancer;
+import org.apache.camel.spi.ServiceCallServerListStrategy;
 
 @Metadata(label = "eip,routing")
 @XmlRootElement(name = "serviceCall")
@@ -52,6 +53,10 @@ public class ServiceCallDefinition extends 
NoOutputDefinition<ServiceCallDefinit
     private String loadBalancerRef;
     @XmlTransient
     private ServiceCallLoadBalancer loadBalancer;
+    @XmlAttribute
+    private String serverListStrategyRef;
+    @XmlTransient
+    private ServiceCallServerListStrategy serverListStrategy;
 
     public ServiceCallDefinition() {
     }
@@ -150,6 +155,22 @@ public class ServiceCallDefinition extends 
NoOutputDefinition<ServiceCallDefinit
         return this;
     }
 
+    /**
+     * Sets a reference to a custom {@link 
org.apache.camel.spi.ServiceCallServerListStrategy} to use.
+     */
+    public ServiceCallDefinition serverListStrategy(String 
serverListStrategyRef) {
+        setServerListStrategyRef(serverListStrategyRef);
+        return this;
+    }
+
+    /**
+     * Sets a custom {@link 
org.apache.camel.spi.ServiceCallServerListStrategy} to use.
+     */
+    public ServiceCallDefinition 
serverListStrategy(ServiceCallServerListStrategy serverListStrategy) {
+        setServerListStrategy(serverListStrategy);
+        return this;
+    }
+
     // Properties
     // 
-------------------------------------------------------------------------
 
@@ -228,4 +249,20 @@ public class ServiceCallDefinition extends 
NoOutputDefinition<ServiceCallDefinit
     public void setLoadBalancer(ServiceCallLoadBalancer loadBalancer) {
         this.loadBalancer = loadBalancer;
     }
+
+    public String getServerListStrategyRef() {
+        return serverListStrategyRef;
+    }
+
+    public void setServerListStrategyRef(String serverListStrategyRef) {
+        this.serverListStrategyRef = serverListStrategyRef;
+    }
+
+    public ServiceCallServerListStrategy getServerListStrategy() {
+        return serverListStrategy;
+    }
+
+    public void setServerListStrategy(ServiceCallServerListStrategy 
serverListStrategy) {
+        this.serverListStrategy = serverListStrategy;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/8577b501/camel-core/src/main/java/org/apache/camel/spi/ServiceCallLoadBalancer.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/spi/ServiceCallLoadBalancer.java 
b/camel-core/src/main/java/org/apache/camel/spi/ServiceCallLoadBalancer.java
index c7b05db..908fcbb 100644
--- a/camel-core/src/main/java/org/apache/camel/spi/ServiceCallLoadBalancer.java
+++ b/camel-core/src/main/java/org/apache/camel/spi/ServiceCallLoadBalancer.java
@@ -19,9 +19,11 @@ package org.apache.camel.spi;
 import java.util.Collection;
 
 /**
- * Allows SPIs to implement custom load balacing strategies for the Service 
Call EIP.
+ * Allows SPIs to implement custom load balancing strategies for the Service 
Call EIP.
+ *
+ * @see ServiceCallServerListStrategy
  */
-public interface ServiceCallLoadBalancer<T> {
+public interface ServiceCallLoadBalancer<T extends ServiceCallServer> {
 
     /**
      * Chooses one of the servers to use using the implemented strategy.

http://git-wip-us.apache.org/repos/asf/camel/blob/8577b501/camel-core/src/main/java/org/apache/camel/spi/ServiceCallServer.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/spi/ServiceCallServer.java 
b/camel-core/src/main/java/org/apache/camel/spi/ServiceCallServer.java
new file mode 100644
index 0000000..1579599
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/spi/ServiceCallServer.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spi;
+
+/**
+ * Represents a server that host a service for the Service Call EIP.
+ *
+ * @see ServiceCallLoadBalancer
+ * @see ServiceCallServerListStrategy
+ */
+public interface ServiceCallServer {
+
+    /**
+     * Gets the IP or hostname of the server hosting the service
+     */
+    String getIp();
+
+    /**
+     * Gets the port number of the server hosting the service
+     */
+    int getPort();
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/8577b501/camel-core/src/main/java/org/apache/camel/spi/ServiceCallServerListStrategy.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/spi/ServiceCallServerListStrategy.java
 
b/camel-core/src/main/java/org/apache/camel/spi/ServiceCallServerListStrategy.java
new file mode 100644
index 0000000..681a662
--- /dev/null
+++ 
b/camel-core/src/main/java/org/apache/camel/spi/ServiceCallServerListStrategy.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spi;
+
+import java.util.Collection;
+
+/**
+ * Allows SPIs to implement custom server list strategies for the Service Call 
EIP.
+ *
+ * @see ServiceCallLoadBalancer
+ * @see ServiceCallServer
+ */
+public interface ServiceCallServerListStrategy<T extends ServiceCallServer> {
+
+    /**
+     * Gets the initial list of servers.
+     * <p/>
+     * This method may return <tt>null</tt> or an empty list.
+     */
+    Collection<T> getInitialListOfServers();
+
+    /**
+     * Gets the updated list of servers.
+     * <p/>
+     * This method can either be called on-demand prior to a service call, or 
have
+     * a background job that is scheduled to update the list, or a watcher
+     * that triggers when the list of servers changes.
+     */
+    Collection<T> getUpdatedListOfServers();
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/8577b501/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesProcessorFactory.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesProcessorFactory.java
 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesProcessorFactory.java
index 05d979b..7ecc6b2 100644
--- 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesProcessorFactory.java
+++ 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesProcessorFactory.java
@@ -28,6 +28,7 @@ import org.apache.camel.model.ServiceCallDefinition;
 import org.apache.camel.spi.ProcessorFactory;
 import org.apache.camel.spi.RouteContext;
 import org.apache.camel.spi.ServiceCallLoadBalancer;
+import org.apache.camel.spi.ServiceCallServerListStrategy;
 import org.apache.camel.util.CamelContextHelper;
 import org.apache.camel.util.IntrospectionSupport;
 
@@ -90,8 +91,18 @@ public class KubernetesProcessorFactory implements 
ProcessorFactory {
                 lb = configureLoadBalancer(routeContext, configRef);
             }
 
+            // lookup the server list strategy to use (configured on EIP takes 
precedence vs configured on configuration)
+            ServiceCallServerListStrategy sl = 
configureServerListStrategy(routeContext, sc);
+            if (sl == null && config != null) {
+                sl = configureServerListStrategy(routeContext, config);
+            }
+            if (sl == null && configRef != null) {
+                sl = configureServerListStrategy(routeContext, configRef);
+            }
+
             KubernetesServiceCallProcessor processor = new 
KubernetesServiceCallProcessor(name, namespace, uri, mep, kc);
             processor.setLoadBalancer(lb);
+            processor.setServerListStrategy(sl);
             return processor;
         } else {
             return null;
@@ -135,4 +146,26 @@ public class KubernetesProcessorFactory implements 
ProcessorFactory {
         return lb;
     }
 
+    private ServiceCallServerListStrategy 
configureServerListStrategy(RouteContext routeContext, ServiceCallDefinition 
sd) {
+        ServiceCallServerListStrategy lb = null;
+
+        if (sd != null) {
+            lb = sd.getServerListStrategy();
+            if (lb == null && sd.getServerListStrategyRef() != null) {
+                lb = 
CamelContextHelper.mandatoryLookup(routeContext.getCamelContext(), 
sd.getServerListStrategyRef(), ServiceCallServerListStrategy.class);
+            }
+        }
+
+        return lb;
+    }
+
+    private ServiceCallServerListStrategy 
configureServerListStrategy(RouteContext routeContext, 
ServiceCallConfigurationDefinition config) {
+        ServiceCallServerListStrategy lb = config.getServerListStrategy();
+        if (lb == null && config.getServerListStrategyRef() != null) {
+            String ref = config.getServerListStrategyRef();
+            lb = 
CamelContextHelper.mandatoryLookup(routeContext.getCamelContext(), ref, 
ServiceCallServerListStrategy.class);
+        }
+        return lb;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/8577b501/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServer.java
 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServer.java
new file mode 100644
index 0000000..007e5c8
--- /dev/null
+++ 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServer.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.processor;
+
+import org.apache.camel.spi.ServiceCallServer;
+
+/**
+ * Represents a model of a kubernetes server.
+ */
+public final class KubernetesServer implements ServiceCallServer {
+
+    private final String ip;
+    private final int port;
+
+    public KubernetesServer(String ip, int port) {
+        this.ip = ip;
+        this.port = port;
+    }
+
+    public String getIp() {
+        return ip;
+    }
+
+    public int getPort() {
+        return port;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/8577b501/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceCallProcessor.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceCallProcessor.java
 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceCallProcessor.java
index ff859d6..8900017 100644
--- 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceCallProcessor.java
+++ 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceCallProcessor.java
@@ -16,7 +16,7 @@
  */
 package org.apache.camel.component.kubernetes.processor;
 
-import java.util.List;
+import java.util.Collection;
 import java.util.concurrent.RejectedExecutionException;
 
 import io.fabric8.kubernetes.client.Config;
@@ -35,6 +35,7 @@ import 
org.apache.camel.component.kubernetes.KubernetesConstants;
 import org.apache.camel.processor.SendDynamicProcessor;
 import org.apache.camel.spi.IdAware;
 import org.apache.camel.spi.ServiceCallLoadBalancer;
+import org.apache.camel.spi.ServiceCallServerListStrategy;
 import org.apache.camel.support.ServiceSupport;
 import org.apache.camel.util.AsyncProcessorHelper;
 import org.apache.camel.util.ObjectHelper;
@@ -58,9 +59,8 @@ public class KubernetesServiceCallProcessor extends 
ServiceSupport implements As
     private final String uri;
     private final ExchangePattern exchangePattern;
     private final KubernetesConfiguration configuration;
-    private KubernetesServiceDiscovery discovery;
-
-    private ServiceCallLoadBalancer<Server> loadBalancer;
+    private ServiceCallServerListStrategy<KubernetesServer> serverListStrategy;
+    private ServiceCallLoadBalancer<KubernetesServer> loadBalancer;
     private final ServiceCallExpression serviceCallExpression;
     private SendDynamicProcessor processor;
 
@@ -101,9 +101,9 @@ public class KubernetesServiceCallProcessor extends 
ServiceSupport implements As
 
     @Override
     public boolean process(Exchange exchange, AsyncCallback callback) {
-        List<Server> servers = null;
+        Collection<KubernetesServer> servers = null;
         try {
-            servers = discovery.getUpdatedListOfServers();
+            servers = serverListStrategy.getUpdatedListOfServers();
             if (servers == null || servers.isEmpty()) {
                 exchange.setException(new RejectedExecutionException("No 
active services with name " + name + " in namespace " + namespace));
             }
@@ -117,7 +117,7 @@ public class KubernetesServiceCallProcessor extends 
ServiceSupport implements As
         }
 
         // let the client load balancer chose which server to use
-        Server server = loadBalancer.chooseServer(servers);
+        KubernetesServer server = loadBalancer.chooseServer(servers);
         String ip = server.getIp();
         int port = server.getPort();
         LOG.debug("Random selected service {} active at server: {}:{}", name, 
ip, port);
@@ -155,14 +155,22 @@ public class KubernetesServiceCallProcessor extends 
ServiceSupport implements As
         return "kubernetes";
     }
 
-    public ServiceCallLoadBalancer<Server> getLoadBalancer() {
+    public ServiceCallLoadBalancer<KubernetesServer> getLoadBalancer() {
         return loadBalancer;
     }
 
-    public void setLoadBalancer(ServiceCallLoadBalancer<Server> loadBalancer) {
+    public void setLoadBalancer(ServiceCallLoadBalancer<KubernetesServer> 
loadBalancer) {
         this.loadBalancer = loadBalancer;
     }
 
+    public ServiceCallServerListStrategy getServerListStrategy() {
+        return serverListStrategy;
+    }
+
+    public void setServerListStrategy(ServiceCallServerListStrategy 
serverListStrategy) {
+        this.serverListStrategy = serverListStrategy;
+    }
+
     @Override
     protected void doStart() throws Exception {
         ObjectHelper.notEmpty(name, "name", this);
@@ -172,20 +180,22 @@ public class KubernetesServiceCallProcessor extends 
ServiceSupport implements As
         if (loadBalancer == null) {
             loadBalancer = new RandomLoadBalancer();
         }
-        LOG.info("KubernetesServiceCall at namespace: {} with service name: {} 
is using load balancer: {}", namespace, name, loadBalancer);
+        if (serverListStrategy == null) {
+            serverListStrategy = new 
KubernetesServiceCallServerListStrategy(name, namespace, null, 
createKubernetesClient());
+        }
+        LOG.info("KubernetesServiceCall at namespace: {} with service name: {} 
is using load balancer: {} and service discovery: {}", namespace, name, 
loadBalancer, serverListStrategy);
 
-        discovery = new KubernetesServiceDiscovery(name, namespace, null, 
createKubernetesClient());
         processor = new SendDynamicProcessor(uri, serviceCallExpression);
         processor.setCamelContext(getCamelContext());
         if (exchangePattern != null) {
             processor.setPattern(exchangePattern);
         }
-        ServiceHelper.startServices(discovery, processor);
+        ServiceHelper.startServices(serverListStrategy, processor);
     }
 
     @Override
     protected void doStop() throws Exception {
-        ServiceHelper.stopServices(processor, discovery);
+        ServiceHelper.stopServices(processor, serverListStrategy);
     }
 
     private OpenShiftClient createKubernetesClient() {

http://git-wip-us.apache.org/repos/asf/camel/blob/8577b501/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceCallServerListStrategy.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceCallServerListStrategy.java
 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceCallServerListStrategy.java
new file mode 100644
index 0000000..94fdc43
--- /dev/null
+++ 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceCallServerListStrategy.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.processor;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import io.fabric8.kubernetes.api.model.EndpointAddress;
+import io.fabric8.kubernetes.api.model.EndpointPort;
+import io.fabric8.kubernetes.api.model.EndpointSubset;
+import io.fabric8.kubernetes.api.model.Endpoints;
+import io.fabric8.openshift.client.OpenShiftClient;
+import org.apache.camel.spi.ServiceCallServerListStrategy;
+import org.apache.camel.support.ServiceSupport;
+import org.apache.camel.util.IOHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Discovers where services are running on which servers in Kubernetes.
+ */
+public class KubernetesServiceCallServerListStrategy extends ServiceSupport 
implements ServiceCallServerListStrategy<KubernetesServer> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(KubernetesServiceCallServerListStrategy.class);
+    private static final int FIRST = 0;
+
+    private String name;
+    private String namespace;
+    private String portName;
+    private OpenShiftClient client;
+
+    public KubernetesServiceCallServerListStrategy(String name, String 
namespace, String portName, OpenShiftClient client) {
+        this.name = name;
+        this.namespace = namespace;
+        this.portName = portName;
+        this.client = client;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public Collection<KubernetesServer> getInitialListOfServers() {
+        return Collections.EMPTY_LIST;
+    }
+
+    public Collection<KubernetesServer> getUpdatedListOfServers() {
+        LOG.debug("Discovering endpoints from namespace: {} with name: {}", 
namespace, name);
+        Endpoints endpoints = 
client.endpoints().inNamespace(namespace).withName(name).get();
+        List<KubernetesServer> result = new ArrayList<KubernetesServer>();
+        if (endpoints != null) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Found {} endpoints in namespace: {} for name: {} 
and portName: {}", endpoints.getSubsets().size(), namespace, name, portName);
+            }
+            for (EndpointSubset subset : endpoints.getSubsets()) {
+                if (subset.getPorts().size() == 1) {
+                    EndpointPort port = subset.getPorts().get(FIRST);
+                    for (EndpointAddress address : subset.getAddresses()) {
+                        result.add(new KubernetesServer(address.getIp(), 
port.getPort()));
+                    }
+                } else {
+                    for (EndpointPort port : subset.getPorts()) {
+                        if (ObjectHelper.isEmpty(portName) || 
portName.endsWith(port.getName())) {
+                            for (EndpointAddress address : 
subset.getAddresses()) {
+                                result.add(new 
KubernetesServer(address.getIp(), port.getPort()));
+                            }
+                        }
+                    }
+                }
+            }
+        }
+
+        return result;
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        // noop
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        if (client != null) {
+            IOHelper.close(client);
+        }
+    }
+
+    public String toString() {
+        return "KubernetesServiceDiscovery";
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/8577b501/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceDiscovery.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceDiscovery.java
 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceDiscovery.java
deleted file mode 100644
index 75590f7..0000000
--- 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceDiscovery.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.kubernetes.processor;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import io.fabric8.kubernetes.api.model.EndpointAddress;
-import io.fabric8.kubernetes.api.model.EndpointPort;
-import io.fabric8.kubernetes.api.model.EndpointSubset;
-import io.fabric8.kubernetes.api.model.Endpoints;
-import io.fabric8.openshift.client.OpenShiftClient;
-import org.apache.camel.support.ServiceSupport;
-import org.apache.camel.util.IOHelper;
-import org.apache.camel.util.ObjectHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Discovers where services in Kubernetes are running on which servers.
- */
-public class KubernetesServiceDiscovery extends ServiceSupport {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(KubernetesServiceDiscovery.class);
-    private static final int FIRST = 0;
-
-    private String name;
-    private String namespace;
-    private String portName;
-    private OpenShiftClient client;
-
-    public KubernetesServiceDiscovery(String name, String namespace, String 
portName, OpenShiftClient client) {
-        this.name = name;
-        this.namespace = namespace;
-        this.portName = portName;
-        this.client = client;
-    }
-
-    public List<Server> getUpdatedListOfServers() {
-        LOG.debug("Discovering endpoints from namespace: {} with name: {}", 
namespace, name);
-        Endpoints endpoints = 
client.endpoints().inNamespace(namespace).withName(name).get();
-        List<Server> result = new ArrayList<Server>();
-        if (endpoints != null) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Found {} endpoints in namespace: {} for name: {} 
and portName: {}", endpoints.getSubsets().size(), namespace, name, portName);
-            }
-            for (EndpointSubset subset : endpoints.getSubsets()) {
-                if (subset.getPorts().size() == 1) {
-                    EndpointPort port = subset.getPorts().get(FIRST);
-                    for (EndpointAddress address : subset.getAddresses()) {
-                        result.add(new Server(address.getIp(), 
port.getPort()));
-                    }
-                } else {
-                    for (EndpointPort port : subset.getPorts()) {
-                        if (ObjectHelper.isEmpty(portName) || 
portName.endsWith(port.getName())) {
-                            for (EndpointAddress address : 
subset.getAddresses()) {
-                                result.add(new Server(address.getIp(), 
port.getPort()));
-                            }
-                        }
-                    }
-                }
-            }
-        }
-
-        return result;
-    }
-
-    @Override
-    protected void doStart() throws Exception {
-        // noop
-    }
-
-    @Override
-    protected void doStop() throws Exception {
-        if (client != null) {
-            IOHelper.close(client);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/8577b501/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/RandomLoadBalancer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/RandomLoadBalancer.java
 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/RandomLoadBalancer.java
index 5724098..1b55e75 100644
--- 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/RandomLoadBalancer.java
+++ 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/RandomLoadBalancer.java
@@ -23,11 +23,11 @@ import java.util.Random;
 
 import org.apache.camel.spi.ServiceCallLoadBalancer;
 
-public class RandomLoadBalancer implements ServiceCallLoadBalancer<Server> {
+public class RandomLoadBalancer implements 
ServiceCallLoadBalancer<KubernetesServer> {
 
     @Override
-    public Server chooseServer(Collection<Server> servers) {
-        List<Server> list = new ArrayList<>(servers);
+    public KubernetesServer chooseServer(Collection<KubernetesServer> servers) 
{
+        List<KubernetesServer> list = new ArrayList<>(servers);
         int size = list.size();
         int ran = new Random().nextInt(size);
         return list.get(ran);

http://git-wip-us.apache.org/repos/asf/camel/blob/8577b501/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/RoundRobinBalancer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/RoundRobinBalancer.java
 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/RoundRobinBalancer.java
index 2a2a401..1cfa86d 100644
--- 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/RoundRobinBalancer.java
+++ 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/RoundRobinBalancer.java
@@ -22,13 +22,13 @@ import java.util.List;
 
 import org.apache.camel.spi.ServiceCallLoadBalancer;
 
-public class RoundRobinBalancer implements ServiceCallLoadBalancer<Server> {
+public class RoundRobinBalancer implements 
ServiceCallLoadBalancer<KubernetesServer> {
 
     private int counter = -1;
 
     @Override
-    public Server chooseServer(Collection<Server> servers) {
-        List<Server> list = new ArrayList<>(servers);
+    public KubernetesServer chooseServer(Collection<KubernetesServer> servers) 
{
+        List<KubernetesServer> list = new ArrayList<>(servers);
 
         int size = list.size();
         if (++counter >= size) {

http://git-wip-us.apache.org/repos/asf/camel/blob/8577b501/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/Server.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/Server.java
 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/Server.java
deleted file mode 100644
index 6a6a913..0000000
--- 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/Server.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.kubernetes.processor;
-
-/**
- * Represents a model of a kubernetes server.
- */
-public final class Server {
-
-    private final String ip;
-    private final int port;
-
-    public Server(String ip, int port) {
-        this.ip = ip;
-        this.port = port;
-    }
-
-    public String getIp() {
-        return ip;
-    }
-
-    public int getPort() {
-        return port;
-    }
-}

Reply via email to