CAMEL-9683: Kubernetes can lookup service using client,env or dns.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/4eb76329 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/4eb76329 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/4eb76329 Branch: refs/heads/remoteServiceCall Commit: 4eb76329ca6b859170253c421cc4c84601521272 Parents: 1f58e58 Author: Claus Ibsen <davscl...@apache.org> Authored: Wed May 18 15:15:19 2016 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Mon May 23 09:27:57 2016 +0200 ---------------------------------------------------------------------- .../support/ServiceCallExpressionSupport.java | 2 +- .../KubernetesDnsServiceCallExpression.java | 81 ++++++++++ .../KubernetesDnsServiceCallProcessor.java | 152 +++++++++++++++++++ .../processor/KubernetesProcessorFactory.java | 12 +- 4 files changed, 242 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/4eb76329/camel-core/src/main/java/org/apache/camel/support/ServiceCallExpressionSupport.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/support/ServiceCallExpressionSupport.java b/camel-core/src/main/java/org/apache/camel/support/ServiceCallExpressionSupport.java index 73e0e72..9901038 100644 --- a/camel-core/src/main/java/org/apache/camel/support/ServiceCallExpressionSupport.java +++ b/camel-core/src/main/java/org/apache/camel/support/ServiceCallExpressionSupport.java @@ -94,7 +94,7 @@ public abstract class ServiceCallExpressionSupport extends ExpressionAdapter { } } - LOG.debug("Camel endpoint uri: {} for calling service: {} + on server {}:{}", answer, name, ip, port); + LOG.debug("Camel endpoint uri: {} for calling service: {} on server {}:{}", answer, name, ip, port); return answer; } http://git-wip-us.apache.org/repos/asf/camel/blob/4eb76329/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesDnsServiceCallExpression.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesDnsServiceCallExpression.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesDnsServiceCallExpression.java new file mode 100644 index 0000000..d5e3751 --- /dev/null +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesDnsServiceCallExpression.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kubernetes.processor; + +import org.apache.camel.Exchange; +import org.apache.camel.support.ExpressionAdapter; +import org.apache.camel.util.ObjectHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KubernetesDnsServiceCallExpression extends ExpressionAdapter { + + private static final Logger LOG = LoggerFactory.getLogger(KubernetesDnsServiceCallExpression.class); + + private final String name; + private final String namespace; + private final String scheme; + private final String contextPath; + private final String uri; + private final String dnsDomain; + + public KubernetesDnsServiceCallExpression(String name, String namespace, String scheme, String contextPath, String uri, String dnsDomain) { + this.name = name; + this.namespace = namespace; + this.scheme = scheme; + this.contextPath = contextPath; + this.uri = uri; + this.dnsDomain = dnsDomain; + } + + @Override + public Object evaluate(Exchange exchange) { + try { + return buildCamelEndpointUri(name, namespace, uri, contextPath, scheme, dnsDomain); + } catch (Exception e) { + throw ObjectHelper.wrapRuntimeCamelException(e); + } + } + + protected static String buildCamelEndpointUri(String name, String namespace, String uri, String contextPath, String scheme, String dnsDomain) { + // build basic uri if none provided + String answer = uri; + if (answer == null) { + if (scheme == null) { + // use http by default if no scheme has been configured + scheme = "http"; + } + answer = scheme + "://" + asKubernetesDnsServicePart(name, namespace, dnsDomain); + if (contextPath != null) { + answer += "/" + contextPath; + } + } else { + // we have existing uri, then replace the serviceName with name.namespace.svc.dnsDomain + if (answer.contains(name)) { + answer = answer.replaceFirst(name, asKubernetesDnsServicePart(name, namespace, dnsDomain)); + } + } + + LOG.debug("Camel endpoint uri: {} for calling service: {}", answer, name); + return answer; + } + + protected static String asKubernetesDnsServicePart(String name, String namespace, String dnsDomain) { + return name + "." + namespace + ".svc." + dnsDomain; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/4eb76329/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesDnsServiceCallProcessor.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesDnsServiceCallProcessor.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesDnsServiceCallProcessor.java new file mode 100644 index 0000000..46a4c7f --- /dev/null +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesDnsServiceCallProcessor.java @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kubernetes.processor; + +import java.util.concurrent.RejectedExecutionException; + +import org.apache.camel.AsyncCallback; +import org.apache.camel.AsyncProcessor; +import org.apache.camel.CamelContext; +import org.apache.camel.CamelContextAware; +import org.apache.camel.Exchange; +import org.apache.camel.ExchangePattern; +import org.apache.camel.Traceable; +import org.apache.camel.component.kubernetes.KubernetesConstants; +import org.apache.camel.processor.SendDynamicProcessor; +import org.apache.camel.spi.IdAware; +import org.apache.camel.support.ServiceSupport; +import org.apache.camel.util.AsyncProcessorHelper; +import org.apache.camel.util.ObjectHelper; +import org.apache.camel.util.ServiceHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Kubernetes based implementation of the the ServiceCall EIP where the service lookup is environment variable based. + */ +public class KubernetesDnsServiceCallProcessor extends ServiceSupport implements AsyncProcessor, CamelContextAware, Traceable, IdAware { + + private static final Logger LOG = LoggerFactory.getLogger(KubernetesClientServiceCallProcessor.class); + + private CamelContext camelContext; + private String id; + private final String name; + private final String scheme; + private final String contextPath; + private final String namespace; + private final String uri; + private final String dnsDomain; + private final ExchangePattern exchangePattern; + private final KubernetesServiceCallExpression serviceCallExpression; + private SendDynamicProcessor processor; + private String ip; + private long port; + + public KubernetesDnsServiceCallProcessor(String name, String namespace, String uri, ExchangePattern exchangePattern, String dnsDomain) { + // setup from the provided name which can contain scheme and context-path information as well + String serviceName; + if (name.contains("/")) { + serviceName = ObjectHelper.before(name, "/"); + this.contextPath = ObjectHelper.after(name, "/"); + } else if (name.contains("?")) { + serviceName = ObjectHelper.before(name, "?"); + this.contextPath = ObjectHelper.after(name, "?"); + } else { + serviceName = name; + this.contextPath = null; + } + if (serviceName.contains(":")) { + this.scheme = ObjectHelper.before(serviceName, ":"); + this.name = ObjectHelper.after(serviceName, ":"); + } else { + this.scheme = null; + this.name = serviceName; + } + + this.namespace = namespace; + this.uri = uri; + this.exchangePattern = exchangePattern; + this.dnsDomain = dnsDomain; + this.serviceCallExpression = new KubernetesServiceCallExpression(this.name, this.scheme, this.contextPath, this.uri); + } + + @Override + public void process(Exchange exchange) throws Exception { + AsyncProcessorHelper.process(this, exchange); + } + + @Override + public boolean process(Exchange exchange, AsyncCallback callback) { + // use a + + LOG.debug("Service {} active at server: {}:{}", name, ip, port); + + // set selected server as header + exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_SERVER_IP, ip); + exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_SERVER_PORT, port); + + // use the dynamic send processor to call the service + return processor.process(exchange, callback); + } + + @Override + public CamelContext getCamelContext() { + return camelContext; + } + + @Override + public void setCamelContext(CamelContext camelContext) { + this.camelContext = camelContext; + } + + @Override + public String getId() { + return id; + } + + @Override + public void setId(String id) { + this.id = id; + } + + @Override + public String getTraceLabel() { + return "kubernetes"; + } + + @Override + protected void doStart() throws Exception { + ObjectHelper.notEmpty(name, "name", this); + ObjectHelper.notEmpty(namespace, "namespace", this); + ObjectHelper.notEmpty(dnsDomain, "dnsDomain", this); + + LOG.info("KubernetesServiceCall at namespace: {} with service name: {} using DNS domain {}", namespace, name, dnsDomain); + + processor = new SendDynamicProcessor(uri, serviceCallExpression); + processor.setCamelContext(getCamelContext()); + if (exchangePattern != null) { + processor.setPattern(exchangePattern); + } + ServiceHelper.startServices(processor); + } + + @Override + protected void doStop() throws Exception { + ServiceHelper.stopServices(processor); + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/4eb76329/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesProcessorFactory.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesProcessorFactory.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesProcessorFactory.java index 4be66c6..6fb695b 100644 --- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesProcessorFactory.java +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesProcessorFactory.java @@ -126,11 +126,15 @@ public class KubernetesProcessorFactory implements ProcessorFactory { processor.setLoadBalancer(lb); processor.setServerListStrategy(sl); return processor; - } else if ("environment".equals(lookup)) { - return new KubernetesEnvironmentServiceCallProcessor(name, namespace, uri, mep); + } else if ("dns".equals(lookup)) { + String dnsDomain = config != null ? config.getDnsDomain() : null; + if (dnsDomain == null && configRef != null) { + dnsDomain = configRef.getDnsDomain(); + } + return new KubernetesDnsServiceCallProcessor(name, namespace, uri, mep, dnsDomain); } else { -// return new KubernetesDnsServiceCallProcessor(name, namespace, uri, mep); - return null; + // environment is default + return new KubernetesEnvironmentServiceCallProcessor(name, namespace, uri, mep); } } else { return null;